This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f040d7ca2 [KYUUBI #4923] [ARROW] Update arguments of
`ArrowUtils#toArrowSchema` function
f040d7ca2 is described below
commit f040d7ca25e87d1ae67d4ef38f24199a95da7f6e
Author: Fu Chen <[email protected]>
AuthorDate: Mon Jun 5 19:21:48 2023 +0800
[KYUUBI #4923] [ARROW] Update arguments of `ArrowUtils#toArrowSchema`
function
### _Why are the changes needed?_
to adapt Spark 3.5, the new conf
`spark.sql.execution.arrow.useLargeVarType` was introduced in
https://github.com/apache/spark/pull/39572
the signature of function `ArrowUtils#toArrowSchema` before
```scala
def toArrowSchema(
schema: StructType,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean): Schema
```
after
```scala
def toArrowSchema(
schema: StructType,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean,
largeVarTypes: Boolean = false): Schema
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4923 from cfmcgrady/arrow-toArrowSchema.
Closes #4923
3806494a5 [Fu Chen] Update Arguments of ArrowUtils#toArrowSchema Function
Authored-by: Fu Chen <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../kyuubi/engine/spark/operation/ExecuteStatement.scala | 16 ++++++++++++++++
.../sql/execution/arrow/KyuubiArrowConverters.scala | 11 +++++++----
2 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 659cf0a61..17d8a7412 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
+import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator,
IterableFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -185,6 +186,8 @@ class ArrowBasedExecuteStatement(
incrementalCollect,
handle) {
+ checkUseLargeVarType()
+
override protected def incrementalCollectResult(resultDF: DataFrame):
Iterator[Any] = {
toArrowBatchLocalIterator(convertComplexType(resultDF))
}
@@ -204,4 +207,17 @@ class ArrowBasedExecuteStatement(
private def convertComplexType(df: DataFrame): DataFrame = {
convertTopLevelComplexTypeToHiveString(df, timestampAsString)
}
+
+ def checkUseLargeVarType(): Unit = {
+ // TODO: largeVarType support, see SPARK-39979.
+ val useLargeVarType = session.asInstanceOf[SparkSessionImpl].spark
+ .conf
+ .get("spark.sql.execution.arrow.useLargeVarType", "false")
+ .toBoolean
+ if (useLargeVarType) {
+ throw new KyuubiSQLException(
+ "`spark.sql.execution.arrow.useLargeVarType = true` not support now.",
+ null)
+ }
+ }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 35fa09b61..f78552602 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -64,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
"slice",
0,
Long.MaxValue)
- val arrowSchema = toArrowSchema(schema, timeZoneId, true)
+ val arrowSchema = toArrowSchema(schema, timeZoneId, true, false)
vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator)
try {
val recordBatch = MessageSerializer.deserializeRecordBatch(
@@ -242,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
context: TaskContext)
extends Iterator[Array[Byte]] {
- protected val arrowSchema = toArrowSchema(schema, timeZoneId, true)
+ protected val arrowSchema = toArrowSchema(schema, timeZoneId, true, false)
private val allocator =
ArrowUtils.rootAllocator.newChildAllocator(
s"to${this.getClass.getSimpleName}",
@@ -327,6 +327,7 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
"org.apache.spark.sql.util.ArrowUtils",
classOf[StructType],
classOf[String],
+ classOf[Boolean],
classOf[Boolean])
.build()
@@ -336,12 +337,14 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
private def toArrowSchema(
schema: StructType,
timeZone: String,
- errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = {
+ errorOnDuplicatedFieldNames: JBoolean,
+ largeVarTypes: JBoolean): ArrowSchema = {
toArrowSchemaMethod.invoke[ArrowSchema](
ArrowUtils,
schema,
timeZone,
- errorOnDuplicatedFieldNames)
+ errorOnDuplicatedFieldNames,
+ largeVarTypes)
}
// IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this
for adapt Spark-3.1/3.2