This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4a61e82576dc3044046fc3668f0f30eed63d4aa7 Author: Alexey Kudinkin <[email protected]> AuthorDate: Wed Feb 1 21:37:34 2023 -0800 [HUDI-5681] Fixing Kryo being instantiated w/ invalid `SparkConf` (#7821) This is addressing misconfiguration of the Kryo object used specifically to serialize Spark's internal structures (like `Expression`s): previously we're using default `SparkConf` instance to configure it, while instead we should have used the one provided by `SparkEnv` --- .../org/apache/spark/sql/hudi/SerDeUtils.scala | 44 ------------------ .../hudi/command/MergeIntoHoodieTableCommand.scala | 8 ++-- .../hudi/command/payload/ExpressionPayload.scala | 54 ++++++++++++++++++++-- 3 files changed, 54 insertions(+), 52 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala deleted file mode 100644 index 631644121c1..00000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hudi - -import org.apache.hudi.common.util.BinaryUtil -import org.apache.spark.SparkConf -import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} - -import java.nio.ByteBuffer - - -object SerDeUtils { - - private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] { - - override protected def initialValue: SerializerInstance = { - new KryoSerializer(new SparkConf(true)).newInstance() - } - } - - def toBytes(o: Any): Array[Byte] = { - val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o) - BinaryUtil.toBytes(buf) - } - - def toObject(bytes: Array[Byte]): Any = { - SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes)) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 418b2f8d6ec..93972b392b2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ -import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils} +import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.types.{BooleanType, StructType} import java.util.Base64 @@ -328,7 +328,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie }).toMap // Serialize the Map[UpdateCondition, UpdateAssignments] to base64 string val serializedUpdateConditionAndExpressions = Base64.getEncoder - .encodeToString(SerDeUtils.toBytes(updateConditionToAssignments)) + .encodeToString(Serializer.toBytes(updateConditionToAssignments)) writeParams += (PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS -> serializedUpdateConditionAndExpressions) @@ -338,7 +338,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie .getOrElse(Literal.create(true, BooleanType)) // Serialize the Map[DeleteCondition, empty] to base64 string val serializedDeleteCondition = Base64.getEncoder - .encodeToString(SerDeUtils.toBytes(Map(deleteCondition -> Seq.empty[Assignment]))) + .encodeToString(Serializer.toBytes(Map(deleteCondition -> Seq.empty[Assignment]))) writeParams += (PAYLOAD_DELETE_CONDITION -> serializedDeleteCondition) } @@ -414,7 +414,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie rewriteCondition -> formatAssignments }).toMap Base64.getEncoder.encodeToString( - SerDeUtils.toBytes(insertConditionAndAssignments)) + Serializer.toBytes(insertConditionAndAssignments)) } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index d874f7bec3a..015ba2e2e8e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -29,18 +29,19 @@ import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord} import org.apache.hudi.common.util.ValidationUtils.checkState -import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} +import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException -import org.apache.hudi.io.HoodieWriteHandle import org.apache.spark.internal.Logging +import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, Projection, SafeProjection} -import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ import org.apache.spark.sql.types.BooleanType +import org.apache.spark.{SparkConf, SparkEnv} +import java.nio.ByteBuffer import java.util.function.{Function, Supplier} import java.util.{Base64, Properties} import scala.collection.JavaConverters._ @@ -420,7 +421,7 @@ object ExpressionPayload { override def apply(key: (String, Schema)): Seq[(Projection, Projection)] = { val (encodedConditionalAssignments, _) = key val serializedBytes = Base64.getDecoder.decode(encodedConditionalAssignments) - val conditionAssignments = SerDeUtils.toObject(serializedBytes) + val conditionAssignments = Serializer.toObject(serializedBytes) .asInstanceOf[Map[Expression, Seq[Expression]]] conditionAssignments.toSeq.map { case (condition, assignments) => @@ -455,5 +456,50 @@ object ExpressionPayload { field.schema, field.doc, field.defaultVal, field.order)) Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava) } + + + /** + * This object differs from Hudi's generic [[SerializationUtils]] in its ability to serialize + * Spark's internal structures (various [[Expression]]s) + * + * For that purpose we re-use Spark's [[KryoSerializer]] instance sharing configuration + * with enclosing [[SparkEnv]]. This is necessary to make sure that this particular instance of Kryo + * user for serialization of Spark's internal structures (like [[Expression]]s) is configured + * appropriately (class-loading, custom serializers, etc) + * + * TODO rebase on Spark's SerializerSupport + */ + private[hudi] object Serializer { + + // NOTE: This is only Spark >= 3.0 + private val KRYO_USE_POOL_CONFIG_KEY = "spark.kryo.pool" + + private lazy val conf = { + val conf = Option(SparkEnv.get) + // To make sure we're not modifying existing environment's [[SparkConf]] + // we're cloning it here + .map(_.conf.clone) + .getOrElse(new SparkConf) + // This serializer is configured as thread-local, hence there's no need for + // pooling + conf.set(KRYO_USE_POOL_CONFIG_KEY, "false") + conf + } + + private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] { + override protected def initialValue: SerializerInstance = { + new KryoSerializer(conf).newInstance() + } + } + + def toBytes(o: Any): Array[Byte] = { + val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o) + BinaryUtil.toBytes(buf) + } + + def toObject(bytes: Array[Byte]): Any = { + SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes)) + } + } }
