This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new 04aa9c4a5d [KYUUBI #6920] Spark SQL engine supports Spark 4.0
04aa9c4a5d is described below
commit 04aa9c4a5d34259af8774d9cab34cb529a4e224e
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Feb 17 16:32:55 2025 +0800
[KYUUBI #6920] Spark SQL engine supports Spark 4.0
### Why are the changes needed?
Spark 4.0 continues to receive breaking changes since 4.0.0-preview2, and
the 4.0.0 RC1 is scheduled at 20250215, this PR fixes all compatibility for the
latest Spark 4.0.0-SNAPSHOT for Spark SQL engine.
### How was this patch tested?
Pass GHA with `spark-master`
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #6920 from pan3793/spark4.
Closes #6920
170430e5e [Cheng Pan] Revert "ci"
c6d889350 [Cheng Pan] fix
86ff7ea2e [Cheng Pan] fix
75d0bf563 [Cheng Pan] ci
9d88c8630 [Cheng Pan] fix spark 4.0 compatibility
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit cc9e11ce596738ccca8918a13c6dfacebd9f27db)
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/spark/operation/SparkOperation.scala | 4 +-
.../sql/execution/SparkSQLExecutionHelper.scala | 39 ++++++++++++++
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 14 ++++-
.../operation/SparkArrowbasedOperationSuite.scala | 61 +++++++++++++++-------
4 files changed, 96 insertions(+), 22 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index da01e85a6d..6d7af17021 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -23,7 +23,7 @@ import java.time.ZoneId
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.SparkSQLExecutionHelper
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
import org.apache.spark.ui.SparkUIUtils.formatDuration
@@ -155,7 +155,7 @@ abstract class SparkOperation(session: Session)
spark.sparkContext.setLocalProperty
protected def withLocalProperties[T](f: => T): T = {
- SQLExecution.withSQLConfPropagated(spark) {
+ SparkSQLExecutionHelper.withSQLConfPropagated(spark) {
val originalSession = SparkSession.getActiveSession
try {
SparkSession.setActiveSession(spark)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkSQLExecutionHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkSQLExecutionHelper.scala
new file mode 100644
index 0000000000..2929afc438
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkSQLExecutionHelper.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
+
+object SparkSQLExecutionHelper {
+
+ private val sparkSessionClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+ .impl("org.apache.spark.sql.SparkSession")
+ .build()
+
+ private val withSQLConfPropagatedMethod =
+ DynMethods.builder("withSQLConfPropagated")
+ .impl(SQLExecution.getClass, sparkSessionClz, classOf[() => Any])
+ .buildChecked(SQLExecution)
+
+ def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
+ withSQLConfPropagatedMethod.invokeChecked[T](sparkSession, () => body)
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index dda7bb4d0a..85cf2971e2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.schema.RowSet
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
-import org.apache.kyuubi.util.reflect.DynMethods
+import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
object SparkDatasetHelper extends Logging {
@@ -63,8 +63,18 @@ object SparkDatasetHelper extends Logging {
toArrowBatchRdd(plan).collect()
}
+ private val datasetClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.classic.Dataset") // SPARK-49700 (4.0.0)
+ .impl("org.apache.spark.sql.Dataset")
+ .build()
+
+ private val toArrowBatchRddMethod =
+ DynMethods.builder("toArrowBatchRdd")
+ .impl(datasetClz)
+ .buildChecked()
+
def toArrowBatchRdd[T](ds: Dataset[T]): RDD[Array[Byte]] = {
- ds.toArrowBatchRdd
+ toArrowBatchRddMethod.bind(ds).invoke()
}
/**
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index ba245f50a3..14f900d90f 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -21,13 +21,15 @@ import java.lang.{Boolean => JBoolean}
import java.sql.Statement
import java.util.Locale
+import scala.util.Try
+
import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec,
QueryExecution, SparkPlan}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SparkMetricsTestUtils
@@ -163,6 +165,7 @@ class SparkArrowbasedOperationSuite extends
WithSparkSQLEngine with SparkDataTyp
sparkPlan.schema,
"",
true,
+ true, // spark.sql.execution.arrow.useLargeVarTypes
KyuubiSparkContextHelper.dummyTaskContext())
assert(rows.size == expectSize)
}
@@ -247,7 +250,11 @@ class SparkArrowbasedOperationSuite extends
WithSparkSQLEngine with SparkDataTyp
|) LIMIT 1
|""".stripMargin)
val smj = plan.collect { case smj: SortMergeJoinExec => smj }
- val bhj = adaptivePlan.collect { case bhj: BroadcastHashJoinExec => bhj }
+ val bhj = (adaptivePlan match {
+ // SPARK-51008 (4.0.0) adds ResultQueryStageExec
+ case queryStage: QueryStageExec => queryStage.plan
+ case plan => plan
+ }).collect { case bhj: BroadcastHashJoinExec => bhj }
assert(smj.size == 1)
assert(bhj.size == 1)
}
@@ -505,49 +512,67 @@ class SparkArrowbasedOperationSuite extends
WithSparkSQLEngine with SparkDataTyp
}
}
- // the signature of function [[ArrowConverters.fromBatchIterator]] is
changed in SPARK-43528
- // (since Spark 3.5)
private lazy val fromBatchIteratorMethod =
DynMethods.builder("fromBatchIterator")
- .hiddenImpl( // for Spark 3.4 or previous
+ .hiddenImpl( // SPARK-51079: Spark 4.0 or later
"org.apache.spark.sql.execution.arrow.ArrowConverters$",
classOf[Iterator[Array[Byte]]],
classOf[StructType],
classOf[String],
+ classOf[Boolean],
+ classOf[Boolean],
classOf[TaskContext])
- .hiddenImpl( // for Spark 3.5 or later
+ .hiddenImpl( // SPARK-43528: Spark 3.5
"org.apache.spark.sql.execution.arrow.ArrowConverters$",
classOf[Iterator[Array[Byte]]],
classOf[StructType],
classOf[String],
classOf[Boolean],
classOf[TaskContext])
- .build()
+ .hiddenImpl( // for Spark 3.4 or previous
+ "org.apache.spark.sql.execution.arrow.ArrowConverters$",
+ classOf[Iterator[Array[Byte]]],
+ classOf[StructType],
+ classOf[String],
+ classOf[TaskContext])
+ .buildChecked()
+
+ private lazy val arrowConvertersObject = DynFields.builder()
+ .impl("org.apache.spark.sql.execution.arrow.ArrowConverters$", "MODULE$")
+ .buildStaticChecked[Any]()
+ .get()
def fromBatchIterator(
arrowBatchIter: Iterator[Array[Byte]],
schema: StructType,
timeZoneId: String,
errorOnDuplicatedFieldNames: JBoolean,
- context: TaskContext): Iterator[InternalRow] = {
- val className = "org.apache.spark.sql.execution.arrow.ArrowConverters$"
- val instance = DynFields.builder().impl(className,
"MODULE$").build[Object]().get(null)
- if (SPARK_ENGINE_RUNTIME_VERSION >= "3.5") {
- fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
- instance,
+ largeVarTypes: JBoolean,
+ context: TaskContext): Iterator[InternalRow] =
+ Try { // SPARK-51079: Spark 4.0 or later
+ fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
+ arrowConvertersObject,
arrowBatchIter,
schema,
timeZoneId,
errorOnDuplicatedFieldNames,
+ largeVarTypes,
context)
- } else {
- fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
- instance,
+ }.recover { case _: Exception => // SPARK-43528: Spark 3.5
+ fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
+ arrowConvertersObject,
arrowBatchIter,
schema,
timeZoneId,
+ errorOnDuplicatedFieldNames,
context)
- }
- }
+ }.recover { case _: Exception => // for Spark 3.4 or previous
+ fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
+ arrowConvertersObject,
+ arrowBatchIter,
+ schema,
+ timeZoneId,
+ context)
+ }.get
class JobCountListener extends SparkListener {
var numJobs = 0