This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 9f67202a7b [KYUUBI #6920] Spark SQL engine supports Spark 4.0
9f67202a7b is described below

commit 9f67202a7b8451d8f227e16a25d731c239c6b008
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Feb 17 16:32:55 2025 +0800

    [KYUUBI #6920] Spark SQL engine supports Spark 4.0
    
    Spark 4.0 continues to receive breaking changes since 4.0.0-preview2, and 
the 4.0.0 RC1 is scheduled at 20250215, this PR fixes all compatibility for the 
latest Spark 4.0.0-SNAPSHOT for Spark SQL engine.
    
    Pass GHA with `spark-master`
    
    No.
    
    Closes #6920 from pan3793/spark4.
    
    Closes #6920
    
    170430e5e [Cheng Pan] Revert "ci"
    c6d889350 [Cheng Pan] fix
    86ff7ea2e [Cheng Pan] fix
    75d0bf563 [Cheng Pan] ci
    9d88c8630 [Cheng Pan] fix spark 4.0 compatibility
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit cc9e11ce596738ccca8918a13c6dfacebd9f27db)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../engine/spark/operation/SparkOperation.scala    |  4 +-
 .../sql/execution/SparkSQLExecutionHelper.scala    | 39 ++++++++++++++
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 14 ++++-
 .../operation/SparkArrowbasedOperationSuite.scala  | 61 +++++++++++++++-------
 4 files changed, 96 insertions(+), 22 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index da01e85a6d..6d7af17021 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -23,7 +23,7 @@ import java.time.ZoneId
 import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
 import org.apache.spark.kyuubi.SparkUtilsHelper.redact
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.SparkSQLExecutionHelper
 import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
 import org.apache.spark.ui.SparkUIUtils.formatDuration
 
@@ -155,7 +155,7 @@ abstract class SparkOperation(session: Session)
     spark.sparkContext.setLocalProperty
 
   protected def withLocalProperties[T](f: => T): T = {
-    SQLExecution.withSQLConfPropagated(spark) {
+    SparkSQLExecutionHelper.withSQLConfPropagated(spark) {
       val originalSession = SparkSession.getActiveSession
       try {
         SparkSession.setActiveSession(spark)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkSQLExecutionHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkSQLExecutionHelper.scala
new file mode 100644
index 0000000000..2929afc438
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkSQLExecutionHelper.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
+
+object SparkSQLExecutionHelper {
+
+  private val sparkSessionClz = DynClasses.builder()
+    .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+    .impl("org.apache.spark.sql.SparkSession")
+    .build()
+
+  private val withSQLConfPropagatedMethod =
+    DynMethods.builder("withSQLConfPropagated")
+      .impl(SQLExecution.getClass, sparkSessionClz, classOf[() => Any])
+      .buildChecked(SQLExecution)
+
+  def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
+    withSQLConfPropagatedMethod.invokeChecked[T](sparkSession, () => body)
+  }
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 8516ffc0f1..49f6171170 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
 import org.apache.kyuubi.engine.spark.schema.RowSet
 import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
-import org.apache.kyuubi.util.reflect.DynMethods
+import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
 import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 object SparkDatasetHelper extends Logging {
@@ -66,8 +66,18 @@ object SparkDatasetHelper extends Logging {
       toArrowBatchRdd(plan).collect()
   }
 
+  private val datasetClz = DynClasses.builder()
+    .impl("org.apache.spark.sql.classic.Dataset") // SPARK-49700 (4.0.0)
+    .impl("org.apache.spark.sql.Dataset")
+    .build()
+
+  private val toArrowBatchRddMethod =
+    DynMethods.builder("toArrowBatchRdd")
+      .impl(datasetClz)
+      .buildChecked()
+
   def toArrowBatchRdd[T](ds: Dataset[T]): RDD[Array[Byte]] = {
-    ds.toArrowBatchRdd
+    toArrowBatchRddMethod.bind(ds).invoke()
   }
 
   /**
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index 4e04148240..eee61d6a98 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -21,13 +21,15 @@ import java.lang.{Boolean => JBoolean}
 import java.sql.Statement
 import java.util.{Locale, Set => JSet}
 
+import scala.util.Try
+
 import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, 
QueryExecution, SparkPlan}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SparkMetricsTestUtils
@@ -164,6 +166,7 @@ class SparkArrowbasedOperationSuite extends 
WithSparkSQLEngine with SparkDataTyp
         sparkPlan.schema,
         "",
         true,
+        true, // spark.sql.execution.arrow.useLargeVarTypes
         KyuubiSparkContextHelper.dummyTaskContext())
       assert(rows.size == expectSize)
     }
@@ -251,7 +254,11 @@ class SparkArrowbasedOperationSuite extends 
WithSparkSQLEngine with SparkDataTyp
           |) LIMIT 1
           |""".stripMargin)
       val smj = plan.collect { case smj: SortMergeJoinExec => smj }
-      val bhj = adaptivePlan.collect { case bhj: BroadcastHashJoinExec => bhj }
+      val bhj = (adaptivePlan match {
+        // SPARK-51008 (4.0.0) adds ResultQueryStageExec
+        case queryStage: QueryStageExec => queryStage.plan
+        case plan => plan
+      }).collect { case bhj: BroadcastHashJoinExec => bhj }
       assert(smj.size == 1)
       assert(bhj.size == 1)
     }
@@ -531,49 +538,67 @@ class SparkArrowbasedOperationSuite extends 
WithSparkSQLEngine with SparkDataTyp
   private def isStaticConfigKey(key: String): Boolean =
     getField[JSet[String]]((SQLConf.getClass, SQLConf), 
"staticConfKeys").contains(key)
 
-  // the signature of function [[ArrowConverters.fromBatchIterator]] is 
changed in SPARK-43528
-  // (since Spark 3.5)
   private lazy val fromBatchIteratorMethod = 
DynMethods.builder("fromBatchIterator")
-    .hiddenImpl( // for Spark 3.4 or previous
+    .hiddenImpl( // SPARK-51079: Spark 4.0 or later
       "org.apache.spark.sql.execution.arrow.ArrowConverters$",
       classOf[Iterator[Array[Byte]]],
       classOf[StructType],
       classOf[String],
+      classOf[Boolean],
+      classOf[Boolean],
       classOf[TaskContext])
-    .hiddenImpl( // for Spark 3.5 or later
+    .hiddenImpl( // SPARK-43528: Spark 3.5
       "org.apache.spark.sql.execution.arrow.ArrowConverters$",
       classOf[Iterator[Array[Byte]]],
       classOf[StructType],
       classOf[String],
       classOf[Boolean],
       classOf[TaskContext])
-    .build()
+    .hiddenImpl( // for Spark 3.4 or previous
+      "org.apache.spark.sql.execution.arrow.ArrowConverters$",
+      classOf[Iterator[Array[Byte]]],
+      classOf[StructType],
+      classOf[String],
+      classOf[TaskContext])
+    .buildChecked()
+
+  private lazy val arrowConvertersObject = DynFields.builder()
+    .impl("org.apache.spark.sql.execution.arrow.ArrowConverters$", "MODULE$")
+    .buildStaticChecked[Any]()
+    .get()
 
   def fromBatchIterator(
       arrowBatchIter: Iterator[Array[Byte]],
       schema: StructType,
       timeZoneId: String,
       errorOnDuplicatedFieldNames: JBoolean,
-      context: TaskContext): Iterator[InternalRow] = {
-    val className = "org.apache.spark.sql.execution.arrow.ArrowConverters$"
-    val instance = DynFields.builder().impl(className, 
"MODULE$").build[Object]().get(null)
-    if (SPARK_ENGINE_RUNTIME_VERSION >= "3.5") {
-      fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
-        instance,
+      largeVarTypes: JBoolean,
+      context: TaskContext): Iterator[InternalRow] =
+    Try { // SPARK-51079: Spark 4.0 or later
+      fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
+        arrowConvertersObject,
         arrowBatchIter,
         schema,
         timeZoneId,
         errorOnDuplicatedFieldNames,
+        largeVarTypes,
         context)
-    } else {
-      fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
-        instance,
+    }.recover { case _: Exception => // SPARK-43528: Spark 3.5
+      fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
+        arrowConvertersObject,
         arrowBatchIter,
         schema,
         timeZoneId,
+        errorOnDuplicatedFieldNames,
         context)
-    }
-  }
+    }.recover { case _: Exception => // for Spark 3.4 or previous
+      fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
+        arrowConvertersObject,
+        arrowBatchIter,
+        schema,
+        timeZoneId,
+        context)
+    }.get
 
   class JobCountListener extends SparkListener {
     var numJobs = 0

Reply via email to