This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f040d7ca2 [KYUUBI #4923] [ARROW] Update arguments of 
`ArrowUtils#toArrowSchema` function
f040d7ca2 is described below

commit f040d7ca25e87d1ae67d4ef38f24199a95da7f6e
Author: Fu Chen <[email protected]>
AuthorDate: Mon Jun 5 19:21:48 2023 +0800

    [KYUUBI #4923] [ARROW] Update arguments of `ArrowUtils#toArrowSchema` 
function
    
    ### _Why are the changes needed?_
    
    to adapt Spark 3.5, the new conf 
`spark.sql.execution.arrow.useLargeVarType` was introduced  in 
https://github.com/apache/spark/pull/39572
    
    the signature of function `ArrowUtils#toArrowSchema` before
    
    ```scala
       def toArrowSchema(
           schema: StructType,
           timeZoneId: String,
           errorOnDuplicatedFieldNames: Boolean): Schema
    ```
    
    after
    
    ```scala
      def toArrowSchema(
          schema: StructType,
          timeZoneId: String,
          errorOnDuplicatedFieldNames: Boolean,
          largeVarTypes: Boolean = false): Schema
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4923 from cfmcgrady/arrow-toArrowSchema.
    
    Closes #4923
    
    3806494a5 [Fu Chen] Update Arguments of ArrowUtils#toArrowSchema Function
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 .../kyuubi/engine/spark/operation/ExecuteStatement.scala | 16 ++++++++++++++++
 .../sql/execution/arrow/KyuubiArrowConverters.scala      | 11 +++++++----
 2 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 659cf0a61..17d8a7412 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.types._
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
+import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
 import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, 
IterableFetchIterator, OperationHandle, OperationState}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
@@ -185,6 +186,8 @@ class ArrowBasedExecuteStatement(
     incrementalCollect,
     handle) {
 
+  checkUseLargeVarType()
+
   override protected def incrementalCollectResult(resultDF: DataFrame): 
Iterator[Any] = {
     toArrowBatchLocalIterator(convertComplexType(resultDF))
   }
@@ -204,4 +207,17 @@ class ArrowBasedExecuteStatement(
   private def convertComplexType(df: DataFrame): DataFrame = {
     convertTopLevelComplexTypeToHiveString(df, timestampAsString)
   }
+
+  def checkUseLargeVarType(): Unit = {
+    // TODO: largeVarType support, see SPARK-39979.
+    val useLargeVarType = session.asInstanceOf[SparkSessionImpl].spark
+      .conf
+      .get("spark.sql.execution.arrow.useLargeVarType", "false")
+      .toBoolean
+    if (useLargeVarType) {
+      throw new KyuubiSQLException(
+        "`spark.sql.execution.arrow.useLargeVarType = true` not support now.",
+        null)
+    }
+  }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 35fa09b61..f78552602 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -64,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with 
Logging {
       "slice",
       0,
       Long.MaxValue)
-    val arrowSchema = toArrowSchema(schema, timeZoneId, true)
+    val arrowSchema = toArrowSchema(schema, timeZoneId, true, false)
     vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator)
     try {
       val recordBatch = MessageSerializer.deserializeRecordBatch(
@@ -242,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with 
Logging {
       context: TaskContext)
     extends Iterator[Array[Byte]] {
 
-    protected val arrowSchema = toArrowSchema(schema, timeZoneId, true)
+    protected val arrowSchema = toArrowSchema(schema, timeZoneId, true, false)
     private val allocator =
       ArrowUtils.rootAllocator.newChildAllocator(
         s"to${this.getClass.getSimpleName}",
@@ -327,6 +327,7 @@ object KyuubiArrowConverters extends SQLConfHelper with 
Logging {
       "org.apache.spark.sql.util.ArrowUtils",
       classOf[StructType],
       classOf[String],
+      classOf[Boolean],
       classOf[Boolean])
     .build()
 
@@ -336,12 +337,14 @@ object KyuubiArrowConverters extends SQLConfHelper with 
Logging {
   private def toArrowSchema(
       schema: StructType,
       timeZone: String,
-      errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = {
+      errorOnDuplicatedFieldNames: JBoolean,
+      largeVarTypes: JBoolean): ArrowSchema = {
     toArrowSchemaMethod.invoke[ArrowSchema](
       ArrowUtils,
       schema,
       timeZone,
-      errorOnDuplicatedFieldNames)
+      errorOnDuplicatedFieldNames,
+      largeVarTypes)
   }
 
   // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this 
for adapt Spark-3.1/3.2

Reply via email to