This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b28e939 [KYUUBI #4895] [ARROW] Reflective calls to the function 
`ArrowConverters#fromBatchIterator`
77b28e939 is described below

commit 77b28e93984c27372096c53613e7fd4a2a3ac81f
Author: Fu Chen <[email protected]>
AuthorDate: Tue May 30 15:45:05 2023 +0800

    [KYUUBI #4895] [ARROW] Reflective calls to the function 
`ArrowConverters#fromBatchIterator`
    
    ### _Why are the changes needed?_
    
    to adapt Spark 3.5
    
    the signature of function `ArrowConverters#fromBatchIterator` is changed in 
[SPARK-43528](https://github.com/apache/spark/pull/41190) (since Spark 3.5)
    
    Spark 3.4 or previous
    
    ```scala
      private[sql] def fromBatchIterator(
          arrowBatchIter: Iterator[Array[Byte]],
          schema: StructType,
          timeZoneId: String,
          context: TaskContext): Iterator[InternalRow]
    ```
    
    Spark 3.5 or later
    
    ```scala
      private[sql] def fromBatchIterator(
          arrowBatchIter: Iterator[Array[Byte]],
          schema: StructType,
          timeZoneId: String,
          errorOnDuplicatedFieldNames: Boolean,
          context: TaskContext): Iterator[InternalRow]
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4895 from cfmcgrady/arrow-spark35.
    
    Closes #4895
    
    87d5b7240 [Fu Chen] fix ci
    b37b321d5 [Fu Chen] adapt Spark 3.5
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../execution/arrow/KyuubiArrowConverters.scala    |  9 ----
 .../operation/SparkArrowbasedOperationSuite.scala  | 55 ++++++++++++++++++++--
 2 files changed, 51 insertions(+), 13 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 24b0ac22e..35fa09b61 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -344,15 +344,6 @@ object KyuubiArrowConverters extends SQLConfHelper with 
Logging {
       errorOnDuplicatedFieldNames)
   }
 
-  // for testing
-  def fromBatchIterator(
-      arrowBatchIter: Iterator[Array[Byte]],
-      schema: StructType,
-      timeZoneId: String,
-      context: TaskContext): Iterator[InternalRow] = {
-    ArrowConverters.fromBatchIterator(arrowBatchIter, schema, timeZoneId, 
context)
-  }
-
   // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this 
for adapt Spark-3.1/3.2
   final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption()
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index fc53cc41a..b8c64b5f2 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -17,22 +17,24 @@
 
 package org.apache.kyuubi.engine.spark.operation
 
+import java.lang.{Boolean => JBoolean}
 import java.sql.Statement
 import java.util.{Locale, Set => JSet}
 
-import org.apache.spark.KyuubiSparkContextHelper
+import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, 
QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SparkMetricsTestUtils
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.kyuubi.SparkDatasetHelper
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.kyuubi.KyuubiException
@@ -40,7 +42,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.spark.{SparkSQLEngine, WithSparkSQLEngine}
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
 import org.apache.kyuubi.operation.SparkDataTypeTests
-import org.apache.kyuubi.util.reflect.DynFields
+import org.apache.kyuubi.util.reflect.{DynFields, DynMethods}
 
 class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with 
SparkDataTypeTests
   with SparkMetricsTestUtils {
@@ -156,10 +158,11 @@ class SparkArrowbasedOperationSuite extends 
WithSparkSQLEngine with SparkDataTyp
 
     def runAndCheck(sparkPlan: SparkPlan, expectSize: Int): Unit = {
       val arrowBinary = SparkDatasetHelper.executeArrowBatchCollect(sparkPlan)
-      val rows = KyuubiArrowConverters.fromBatchIterator(
+      val rows = fromBatchIterator(
         arrowBinary.iterator,
         sparkPlan.schema,
         "",
+        true,
         KyuubiSparkContextHelper.dummyTaskContext())
       assert(rows.size == expectSize)
     }
@@ -532,6 +535,50 @@ class SparkArrowbasedOperationSuite extends 
WithSparkSQLEngine with SparkDataTyp
     staticConfKeys.contains(key)
   }
 
+  // the signature of function [[ArrowConverters.fromBatchIterator]] is 
changed in SPARK-43528
+  // (since Spark 3.5)
+  private lazy val fromBatchIteratorMethod = 
DynMethods.builder("fromBatchIterator")
+    .hiddenImpl( // for Spark 3.4 or previous
+      "org.apache.spark.sql.execution.arrow.ArrowConverters$",
+      classOf[Iterator[Array[Byte]]],
+      classOf[StructType],
+      classOf[String],
+      classOf[TaskContext])
+    .hiddenImpl( // for Spark 3.5 or later
+      "org.apache.spark.sql.execution.arrow.ArrowConverters$",
+      classOf[Iterator[Array[Byte]]],
+      classOf[StructType],
+      classOf[String],
+      classOf[Boolean],
+      classOf[TaskContext])
+    .build()
+
+  def fromBatchIterator(
+      arrowBatchIter: Iterator[Array[Byte]],
+      schema: StructType,
+      timeZoneId: String,
+      errorOnDuplicatedFieldNames: JBoolean,
+      context: TaskContext): Iterator[InternalRow] = {
+    val className = "org.apache.spark.sql.execution.arrow.ArrowConverters$"
+    val instance = DynFields.builder().impl(className, 
"MODULE$").build[Object]().get(null)
+    if (SPARK_ENGINE_RUNTIME_VERSION >= "3.5") {
+      fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
+        instance,
+        arrowBatchIter,
+        schema,
+        timeZoneId,
+        errorOnDuplicatedFieldNames,
+        context)
+    } else {
+      fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
+        instance,
+        arrowBatchIter,
+        schema,
+        timeZoneId,
+        context)
+    }
+  }
+
   class JobCountListener extends SparkListener {
     var numJobs = 0
     override def onJobStart(jobStart: SparkListenerJobStart): Unit = {

Reply via email to