This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4a61e82576dc3044046fc3668f0f30eed63d4aa7
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Feb 1 21:37:34 2023 -0800

    [HUDI-5681] Fixing Kryo being instantiated w/ invalid `SparkConf` (#7821)
    
    This is addressing misconfiguration of the Kryo object used specifically to 
serialize Spark's internal structures (like `Expression`s): previously we're 
using default `SparkConf` instance to configure it, while instead we should 
have used the one provided by `SparkEnv`
---
 .../org/apache/spark/sql/hudi/SerDeUtils.scala     | 44 ------------------
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  8 ++--
 .../hudi/command/payload/ExpressionPayload.scala   | 54 ++++++++++++++++++++--
 3 files changed, 54 insertions(+), 52 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala
deleted file mode 100644
index 631644121c1..00000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hudi
-
-import org.apache.hudi.common.util.BinaryUtil
-import org.apache.spark.SparkConf
-import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
-
-import java.nio.ByteBuffer
-
-
-object SerDeUtils {
-
-  private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] {
-
-    override protected def initialValue: SerializerInstance = {
-      new KryoSerializer(new SparkConf(true)).newInstance()
-    }
-  }
-
-  def toBytes(o: Any): Array[Byte] = {
-    val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o)
-    BinaryUtil.toBytes(buf)
-  }
-
-  def toObject(bytes: Array[Byte]): Any = {
-    SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes))
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 418b2f8d6ec..93972b392b2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -42,7 +42,7 @@ import 
org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
 import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
-import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.types.{BooleanType, StructType}
 
 import java.util.Base64
@@ -328,7 +328,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       }).toMap
     // Serialize the Map[UpdateCondition, UpdateAssignments] to base64 string
     val serializedUpdateConditionAndExpressions = Base64.getEncoder
-      .encodeToString(SerDeUtils.toBytes(updateConditionToAssignments))
+      .encodeToString(Serializer.toBytes(updateConditionToAssignments))
     writeParams += (PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS ->
       serializedUpdateConditionAndExpressions)
 
@@ -338,7 +338,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         .getOrElse(Literal.create(true, BooleanType))
       // Serialize the Map[DeleteCondition, empty] to base64 string
       val serializedDeleteCondition = Base64.getEncoder
-        .encodeToString(SerDeUtils.toBytes(Map(deleteCondition -> 
Seq.empty[Assignment])))
+        .encodeToString(Serializer.toBytes(Map(deleteCondition -> 
Seq.empty[Assignment])))
       writeParams += (PAYLOAD_DELETE_CONDITION -> serializedDeleteCondition)
     }
 
@@ -414,7 +414,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         rewriteCondition -> formatAssignments
       }).toMap
     Base64.getEncoder.encodeToString(
-      SerDeUtils.toBytes(insertConditionAndAssignments))
+      Serializer.toBytes(insertConditionAndAssignments))
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index d874f7bec3a..015ba2e2e8e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -29,18 +29,19 @@ import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
 import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodiePayloadProps, HoodieRecord}
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
+import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => 
HOption}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.io.HoodieWriteHandle
 import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, Projection, 
SafeProjection}
-import org.apache.spark.sql.hudi.SerDeUtils
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.{SparkConf, SparkEnv}
 
+import java.nio.ByteBuffer
 import java.util.function.{Function, Supplier}
 import java.util.{Base64, Properties}
 import scala.collection.JavaConverters._
@@ -420,7 +421,7 @@ object ExpressionPayload {
           override def apply(key: (String, Schema)): Seq[(Projection, 
Projection)] = {
             val (encodedConditionalAssignments, _) = key
             val serializedBytes = 
Base64.getDecoder.decode(encodedConditionalAssignments)
-            val conditionAssignments = SerDeUtils.toObject(serializedBytes)
+            val conditionAssignments = Serializer.toObject(serializedBytes)
               .asInstanceOf[Map[Expression, Seq[Expression]]]
             conditionAssignments.toSeq.map {
               case (condition, assignments) =>
@@ -455,5 +456,50 @@ object ExpressionPayload {
             field.schema, field.doc, field.defaultVal, field.order))
     Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, 
mergedFields.asJava)
   }
+
+
+  /**
+   * This object differs from Hudi's generic [[SerializationUtils]] in its 
ability to serialize
+   * Spark's internal structures (various [[Expression]]s)
+   *
+   * For that purpose we re-use Spark's [[KryoSerializer]] instance sharing 
configuration
+   * with enclosing [[SparkEnv]]. This is necessary to make sure that this 
particular instance of Kryo
+   * user for serialization of Spark's internal structures (like 
[[Expression]]s) is configured
+   * appropriately (class-loading, custom serializers, etc)
+   *
+   * TODO rebase on Spark's SerializerSupport
+   */
+  private[hudi] object Serializer {
+
+    // NOTE: This is only Spark >= 3.0
+    private val KRYO_USE_POOL_CONFIG_KEY = "spark.kryo.pool"
+
+    private lazy val conf = {
+      val conf = Option(SparkEnv.get)
+        // To make sure we're not modifying existing environment's 
[[SparkConf]]
+        // we're cloning it here
+        .map(_.conf.clone)
+        .getOrElse(new SparkConf)
+      // This serializer is configured as thread-local, hence there's no need 
for
+      // pooling
+      conf.set(KRYO_USE_POOL_CONFIG_KEY, "false")
+      conf
+    }
+
+    private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] {
+      override protected def initialValue: SerializerInstance = {
+        new KryoSerializer(conf).newInstance()
+      }
+    }
+
+    def toBytes(o: Any): Array[Byte] = {
+      val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o)
+      BinaryUtil.toBytes(buf)
+    }
+
+    def toObject(bytes: Array[Byte]): Any = {
+      SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes))
+    }
+  }
 }
 

Reply via email to