This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch ak/spk-sql-cfg-fix
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 34efdb0772f33551f28eb46971d96b4337856163
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Tue Aug 9 17:36:07 2022 -0700

    Added `SQLConfInjectingRDD`;
    Fixed `HoodieSparkUtils.createRDD` to make sure `SQLConf` is properly 
propagated to the executor (required by `AvroSerializer`)
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   | 15 +++++++-
 .../spark/sql/execution/SQLConfInjectingRDD.scala  | 43 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 980030cc7c..2b496cfb55 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -26,9 +26,13 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.SQLConfInjectingRDD
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 private[hudi] trait SparkVersionsSupport {
   def getSparkVersion: String
@@ -89,8 +93,12 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport {
     //       serializer is not able to digest it
     val readerAvroSchemaStr = readerAvroSchema.toString
     val writerAvroSchemaStr = writerAvroSchema.toString
+
     // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
-    df.queryExecution.toRdd.mapPartitions { rows =>
+    //       Additionally, we have to explicitly wrap around resulting [[RDD]] 
into the one
+    //       injecting [[SQLConf]], which by default isn't propgated by Spark 
to the executor(s).
+    //       [[SQLConf]] is required by [[AvroSerializer]]
+    injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows =>
       if (rows.isEmpty) {
         Iterator.empty
       } else {
@@ -108,10 +116,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport {
 
         rows.map { ir => transform(convert(ir)) }
       }
-    }
+    }, SQLConf.get)
   }
 
   def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
     sparkAdapter.createSparkRowSerDe(structType)
   }
+
+  private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
+    new SQLConfInjectingRDD(rdd, conf)
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala
new file mode 100644
index 0000000000..477e96ed10
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala
@@ -0,0 +1,43 @@
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf
+
+import scala.reflect.ClassTag
+
+/**
+ * NOTE: This is a generalized version of of Spark's [[SQLExecutionRDD]]
+ *
+ * It is just a wrapper over [[sqlRDD]] which sets and makes effective all the 
configs from the
+ * captured [[SQLConf]]
+ *
+ * @param sqlRDD the `RDD` generated by the SQL plan
+ * @param conf the `SQLConf` to apply to the execution of the SQL plan
+ */
+class SQLConfInjectingRDD[T: ClassTag](var sqlRDD: RDD[T], @transient conf: 
SQLConf) extends RDD[T](sqlRDD) {
+  private val sqlConfigs = conf.getAllConfs
+  private lazy val sqlConfExecutorSide = {
+    val newConf = new SQLConf()
+    sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) }
+    newConf
+  }
+
+  override val partitioner = firstParent[InternalRow].partitioner
+
+  override def getPartitions: Array[Partition] = 
firstParent[InternalRow].partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    // If we are in the context of a tracked SQL operation, 
`SQLExecution.EXECUTION_ID_KEY` is set
+    // and we have nothing to do here. Otherwise, we use the `SQLConf` 
captured at the creation of
+    // this RDD.
+    if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
+      SQLConf.withExistingConf(sqlConfExecutorSide) {
+        firstParent[T].iterator(split, context)
+      }
+    } else {
+      firstParent[T].iterator(split, context)
+    }
+  }
+}
\ No newline at end of file

Reply via email to