This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch ak/spk-sql-cfg-fix in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 34efdb0772f33551f28eb46971d96b4337856163 Author: Alexey Kudinkin <[email protected]> AuthorDate: Tue Aug 9 17:36:07 2022 -0700 Added `SQLConfInjectingRDD`; Fixed `HoodieSparkUtils.createRDD` to make sure `SQLConf` is properly propagated to the executor (required by `AvroSerializer`) --- .../scala/org/apache/hudi/HoodieSparkUtils.scala | 15 +++++++- .../spark/sql/execution/SQLConfInjectingRDD.scala | 43 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 980030cc7c..2b496cfb55 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -26,9 +26,13 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.spark.SPARK_VERSION import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.SQLConfInjectingRDD +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.JavaConverters._ +import scala.reflect.ClassTag private[hudi] trait SparkVersionsSupport { def getSparkVersion: String @@ -89,8 +93,12 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport { // serializer is not able to digest it val readerAvroSchemaStr = readerAvroSchema.toString val writerAvroSchemaStr = writerAvroSchema.toString + // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to [[Row]] conversion - df.queryExecution.toRdd.mapPartitions { rows => + // Additionally, we have to explicitly wrap around resulting [[RDD]] into the one + // injecting [[SQLConf]], which by default isn't propgated by Spark to the executor(s). + // [[SQLConf]] is required by [[AvroSerializer]] + injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows => if (rows.isEmpty) { Iterator.empty } else { @@ -108,10 +116,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport { rows.map { ir => transform(convert(ir)) } } - } + }, SQLConf.get) } def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = { sparkAdapter.createSparkRowSerDe(structType) } + + private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] = + new SQLConfInjectingRDD(rdd, conf) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala new file mode 100644 index 0000000000..477e96ed10 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala @@ -0,0 +1,43 @@ +package org.apache.spark.sql.execution + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf + +import scala.reflect.ClassTag + +/** + * NOTE: This is a generalized version of of Spark's [[SQLExecutionRDD]] + * + * It is just a wrapper over [[sqlRDD]] which sets and makes effective all the configs from the + * captured [[SQLConf]] + * + * @param sqlRDD the `RDD` generated by the SQL plan + * @param conf the `SQLConf` to apply to the execution of the SQL plan + */ +class SQLConfInjectingRDD[T: ClassTag](var sqlRDD: RDD[T], @transient conf: SQLConf) extends RDD[T](sqlRDD) { + private val sqlConfigs = conf.getAllConfs + private lazy val sqlConfExecutorSide = { + val newConf = new SQLConf() + sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) } + newConf + } + + override val partitioner = firstParent[InternalRow].partitioner + + override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + // If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set + // and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of + // this RDD. + if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) { + SQLConf.withExistingConf(sqlConfExecutorSide) { + firstParent[T].iterator(split, context) + } + } else { + firstParent[T].iterator(split, context) + } + } +} \ No newline at end of file
