This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new acdb6a36d3 [KYUUBI #7245] Fix arrow batch converter error
acdb6a36d3 is described below

commit acdb6a36d3643074ede173786bbe0f908194383c
Author: echo567 <[email protected]>
AuthorDate: Thu Dec 25 11:08:24 2025 +0800

    [KYUUBI #7245] Fix arrow batch converter error
    
    ### Why are the changes needed?
    Control the amount of data to prevent memory overflow and increase to 
initial speed.
    
    When `kyuubi.operation.result.format=arrow`, 
`spark.connect.grpc.arrow.maxBatchSize` does not work as expected.
    
    Reproduction:
    You can debug `KyuubiArrowConverters` or add the following log to line 300 
of `KyuubiArrowConverters`:
    
    ```
    logInfo(s"Total limit: ${limit}, rowCount: ${rowCount}, " +
    s"rowCountInLastBatch:${rowCountInLastBatch}," +
    s"estimatedBatchSize: ${estimatedBatchSize}," +
    s"maxEstimatedBatchSize: ${maxEstimatedBatchSize}," +
    s"maxRecordsPerBatch:${maxRecordsPerBatch}")
    ```
    
    Test data: 1.6 million rows, 30 columns per row. Command executed:
    ```
    bin/beeline \
      -u 
'jdbc:hive2://10.168.X.X:XX/default;thrift.client.max.message.size=2000000000' \
      --hiveconf kyuubi.operation.result.format=arrow \
      -n test -p 'testpass' \
      --outputformat=csv2 -e "select * from db.table" > /tmp/test.csv
    ```
    
    Log output
    ```
    25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000 
maxEstimatedBatchSize: 4,maxRecordsPerBatch:10000
    25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000
    ```
    
    Original Code
    ```
    while (rowIter.hasNext && (
    rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
    estimatedBatchSize <= 0 ||
    estimatedBatchSize < maxEstimatedBatchSize ||
    maxRecordsPerBatch <= 0 ||
    rowCountInLastBatch < maxRecordsPerBatch ||
    rowCount < limit ||
    limit < 0))
    ```
    When the `limit` is not set, i.e., `-1`, all data will be retrieved at 
once. If the row count is too large, the following three problems will occur:
    (1) Driver/executor oom
    (2) Array oom cause of array length is not enough
    (3) Transfer data slowly
    
    After updating the code, the log output is as follows:
    ```
    25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, 
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
    25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, 
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
    25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, 
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
    ```
    The estimatedBatchSize is slightly larger than the maxEstimatedBatchSize. 
Data can be written in batches as expected.
    
    Fix #7245.
    
    ### How was this patch tested?
    Test data: 1.6 million rows, 30 columns per row.
    ```
    25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, 
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
    25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, 
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
    25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 
17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, 
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #7246 from echo567/fix-arrow-converter.
    
    Closes #7245
    
    6ef4ef13f [echo567] Merge branch 'master' into fix-arrow-converter
    c9d0d186f [echo567] fix(arrow): repairing arrow based on spark
    479d7e40b [echo567] fix(spark): fix arrow batch converter error
    
    Authored-by: echo567 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../execution/arrow/KyuubiArrowConverters.scala    | 31 ++++++++++++++--------
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      |  5 ++--
 2 files changed, 23 insertions(+), 13 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index e13653b01c..75618e3124 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -274,19 +274,28 @@ object KyuubiArrowConverters extends SQLConfHelper with 
Logging {
       var estimatedBatchSize = 0L
       Utils.tryWithSafeFinally {
 
+        def isBatchSizeLimitExceeded: Boolean = {
+          // If `maxEstimatedBatchSize` is zero or negative, it implies 
unlimited.
+          maxEstimatedBatchSize > 0 && estimatedBatchSize >= 
maxEstimatedBatchSize
+        }
+        def isRecordLimitExceeded: Boolean = {
+          // If `maxRecordsPerBatch` is zero or negative, it implies unlimited.
+          maxRecordsPerBatch > 0 && rowCountInLastBatch >= maxRecordsPerBatch
+        }
+        def isGlobalLimitNotReached: Boolean = {
+          // If the limit is negative, it means no restriction
+          // or the current number of rows has not reached the limit.
+          rowCount < limit || limit < 0
+        }
+
         // Always write the first row.
-        while (rowIter.hasNext && (
-            // For maxBatchSize and maxRecordsPerBatch, respect whatever 
smaller.
+        while (rowIter.hasNext && isGlobalLimitNotReached && (
             // If the size in bytes is positive (set properly), always write 
the first row.
-            rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
-              // If the size in bytes of rows are 0 or negative, unlimit it.
-              estimatedBatchSize <= 0 ||
-              estimatedBatchSize < maxEstimatedBatchSize ||
-              // If the size of rows are 0 or negative, unlimit it.
-              maxRecordsPerBatch <= 0 ||
-              rowCountInLastBatch < maxRecordsPerBatch ||
-              rowCount < limit ||
-              limit < 0)) {
+            rowCountInLastBatch == 0 ||
+              // If either limit is hit, create a batch. This implies that the 
limit that is hit
+              // first triggers the creation of a batch even if the other 
limit is not yet hit
+              // hence preferring the more restrictive limit.
+              (!isBatchSizeLimitExceeded && !isRecordLimitExceeded))) {
           val row = rowIter.next()
           arrowWriter.write(row)
           estimatedBatchSize += (row match {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 73e7f77993..7d6e5f8fdb 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -164,8 +164,9 @@ object SparkDatasetHelper extends Logging {
     KyuubiSparkUtil.globalSparkContext
       .getConf
       .getOption("spark.connect.grpc.arrow.maxBatchSize")
-      .orElse(Option("4m"))
-      .map(JavaUtils.byteStringAs(_, ByteUnit.MiB))
+      // 4m
+      .orElse(Option("4194304b"))
+      .map(JavaUtils.byteStringAs(_, ByteUnit.BYTE))
       .get
   }
 

Reply via email to