This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 77b28e939 [KYUUBI #4895] [ARROW] Reflective calls to the function
`ArrowConverters#fromBatchIterator`
77b28e939 is described below
commit 77b28e93984c27372096c53613e7fd4a2a3ac81f
Author: Fu Chen <[email protected]>
AuthorDate: Tue May 30 15:45:05 2023 +0800
[KYUUBI #4895] [ARROW] Reflective calls to the function
`ArrowConverters#fromBatchIterator`
### _Why are the changes needed?_
to adapt Spark 3.5
the signature of function `ArrowConverters#fromBatchIterator` is changed in
[SPARK-43528](https://github.com/apache/spark/pull/41190) (since Spark 3.5)
Spark 3.4 or previous
```scala
private[sql] def fromBatchIterator(
arrowBatchIter: Iterator[Array[Byte]],
schema: StructType,
timeZoneId: String,
context: TaskContext): Iterator[InternalRow]
```
Spark 3.5 or later
```scala
private[sql] def fromBatchIterator(
arrowBatchIter: Iterator[Array[Byte]],
schema: StructType,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean,
context: TaskContext): Iterator[InternalRow]
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4895 from cfmcgrady/arrow-spark35.
Closes #4895
87d5b7240 [Fu Chen] fix ci
b37b321d5 [Fu Chen] adapt Spark 3.5
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../execution/arrow/KyuubiArrowConverters.scala | 9 ----
.../operation/SparkArrowbasedOperationSuite.scala | 55 ++++++++++++++++++++--
2 files changed, 51 insertions(+), 13 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 24b0ac22e..35fa09b61 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -344,15 +344,6 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
errorOnDuplicatedFieldNames)
}
- // for testing
- def fromBatchIterator(
- arrowBatchIter: Iterator[Array[Byte]],
- schema: StructType,
- timeZoneId: String,
- context: TaskContext): Iterator[InternalRow] = {
- ArrowConverters.fromBatchIterator(arrowBatchIter, schema, timeZoneId,
context)
- }
-
// IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this
for adapt Spark-3.1/3.2
final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index fc53cc41a..b8c64b5f2 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -17,22 +17,24 @@
package org.apache.kyuubi.engine.spark.operation
+import java.lang.{Boolean => JBoolean}
import java.sql.Statement
import java.util.{Locale, Set => JSet}
-import org.apache.spark.KyuubiSparkContextHelper
+import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec,
QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SparkMetricsTestUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kyuubi.SparkDatasetHelper
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.kyuubi.KyuubiException
@@ -40,7 +42,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{SparkSQLEngine, WithSparkSQLEngine}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.SparkDataTypeTests
-import org.apache.kyuubi.util.reflect.DynFields
+import org.apache.kyuubi.util.reflect.{DynFields, DynMethods}
class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with
SparkDataTypeTests
with SparkMetricsTestUtils {
@@ -156,10 +158,11 @@ class SparkArrowbasedOperationSuite extends
WithSparkSQLEngine with SparkDataTyp
def runAndCheck(sparkPlan: SparkPlan, expectSize: Int): Unit = {
val arrowBinary = SparkDatasetHelper.executeArrowBatchCollect(sparkPlan)
- val rows = KyuubiArrowConverters.fromBatchIterator(
+ val rows = fromBatchIterator(
arrowBinary.iterator,
sparkPlan.schema,
"",
+ true,
KyuubiSparkContextHelper.dummyTaskContext())
assert(rows.size == expectSize)
}
@@ -532,6 +535,50 @@ class SparkArrowbasedOperationSuite extends
WithSparkSQLEngine with SparkDataTyp
staticConfKeys.contains(key)
}
+ // the signature of function [[ArrowConverters.fromBatchIterator]] is
changed in SPARK-43528
+ // (since Spark 3.5)
+ private lazy val fromBatchIteratorMethod =
DynMethods.builder("fromBatchIterator")
+ .hiddenImpl( // for Spark 3.4 or previous
+ "org.apache.spark.sql.execution.arrow.ArrowConverters$",
+ classOf[Iterator[Array[Byte]]],
+ classOf[StructType],
+ classOf[String],
+ classOf[TaskContext])
+ .hiddenImpl( // for Spark 3.5 or later
+ "org.apache.spark.sql.execution.arrow.ArrowConverters$",
+ classOf[Iterator[Array[Byte]]],
+ classOf[StructType],
+ classOf[String],
+ classOf[Boolean],
+ classOf[TaskContext])
+ .build()
+
+ def fromBatchIterator(
+ arrowBatchIter: Iterator[Array[Byte]],
+ schema: StructType,
+ timeZoneId: String,
+ errorOnDuplicatedFieldNames: JBoolean,
+ context: TaskContext): Iterator[InternalRow] = {
+ val className = "org.apache.spark.sql.execution.arrow.ArrowConverters$"
+ val instance = DynFields.builder().impl(className,
"MODULE$").build[Object]().get(null)
+ if (SPARK_ENGINE_RUNTIME_VERSION >= "3.5") {
+ fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
+ instance,
+ arrowBatchIter,
+ schema,
+ timeZoneId,
+ errorOnDuplicatedFieldNames,
+ context)
+ } else {
+ fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
+ instance,
+ arrowBatchIter,
+ schema,
+ timeZoneId,
+ context)
+ }
+ }
+
class JobCountListener extends SparkListener {
var numJobs = 0
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {