This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new acdb6a36d3 [KYUUBI #7245] Fix arrow batch converter error
acdb6a36d3 is described below
commit acdb6a36d3643074ede173786bbe0f908194383c
Author: echo567 <[email protected]>
AuthorDate: Thu Dec 25 11:08:24 2025 +0800
[KYUUBI #7245] Fix arrow batch converter error
### Why are the changes needed?
Control the amount of data to prevent memory overflow and increase to
initial speed.
When `kyuubi.operation.result.format=arrow`,
`spark.connect.grpc.arrow.maxBatchSize` does not work as expected.
Reproduction:
You can debug `KyuubiArrowConverters` or add the following log to line 300
of `KyuubiArrowConverters`:
```
logInfo(s"Total limit: ${limit}, rowCount: ${rowCount}, " +
s"rowCountInLastBatch:${rowCountInLastBatch}," +
s"estimatedBatchSize: ${estimatedBatchSize}," +
s"maxEstimatedBatchSize: ${maxEstimatedBatchSize}," +
s"maxRecordsPerBatch:${maxRecordsPerBatch}")
```
Test data: 1.6 million rows, 30 columns per row. Command executed:
```
bin/beeline \
-u
'jdbc:hive2://10.168.X.X:XX/default;thrift.client.max.message.size=2000000000' \
--hiveconf kyuubi.operation.result.format=arrow \
-n test -p 'testpass' \
--outputformat=csv2 -e "select * from db.table" > /tmp/test.csv
```
Log output
```
25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000
maxEstimatedBatchSize: 4,maxRecordsPerBatch:10000
25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000
```
Original Code
```
while (rowIter.hasNext && (
rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
estimatedBatchSize <= 0 ||
estimatedBatchSize < maxEstimatedBatchSize ||
maxRecordsPerBatch <= 0 ||
rowCountInLastBatch < maxRecordsPerBatch ||
rowCount < limit ||
limit < 0))
```
When the `limit` is not set, i.e., `-1`, all data will be retrieved at
once. If the row count is too large, the following three problems will occur:
(1) Driver/executor oom
(2) Array oom cause of array length is not enough
(3) Transfer data slowly
After updating the code, the log output is as follows:
```
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736,
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736,
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736,
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
```
The estimatedBatchSize is slightly larger than the maxEstimatedBatchSize.
Data can be written in batches as expected.
Fix #7245.
### How was this patch tested?
Test data: 1.6 million rows, 30 columns per row.
```
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736,
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736,
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount:
17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736,
maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7246 from echo567/fix-arrow-converter.
Closes #7245
6ef4ef13f [echo567] Merge branch 'master' into fix-arrow-converter
c9d0d186f [echo567] fix(arrow): repairing arrow based on spark
479d7e40b [echo567] fix(spark): fix arrow batch converter error
Authored-by: echo567 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../execution/arrow/KyuubiArrowConverters.scala | 31 ++++++++++++++--------
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 5 ++--
2 files changed, 23 insertions(+), 13 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index e13653b01c..75618e3124 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -274,19 +274,28 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
var estimatedBatchSize = 0L
Utils.tryWithSafeFinally {
+ def isBatchSizeLimitExceeded: Boolean = {
+ // If `maxEstimatedBatchSize` is zero or negative, it implies
unlimited.
+ maxEstimatedBatchSize > 0 && estimatedBatchSize >=
maxEstimatedBatchSize
+ }
+ def isRecordLimitExceeded: Boolean = {
+ // If `maxRecordsPerBatch` is zero or negative, it implies unlimited.
+ maxRecordsPerBatch > 0 && rowCountInLastBatch >= maxRecordsPerBatch
+ }
+ def isGlobalLimitNotReached: Boolean = {
+ // If the limit is negative, it means no restriction
+ // or the current number of rows has not reached the limit.
+ rowCount < limit || limit < 0
+ }
+
// Always write the first row.
- while (rowIter.hasNext && (
- // For maxBatchSize and maxRecordsPerBatch, respect whatever
smaller.
+ while (rowIter.hasNext && isGlobalLimitNotReached && (
// If the size in bytes is positive (set properly), always write
the first row.
- rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
- // If the size in bytes of rows are 0 or negative, unlimit it.
- estimatedBatchSize <= 0 ||
- estimatedBatchSize < maxEstimatedBatchSize ||
- // If the size of rows are 0 or negative, unlimit it.
- maxRecordsPerBatch <= 0 ||
- rowCountInLastBatch < maxRecordsPerBatch ||
- rowCount < limit ||
- limit < 0)) {
+ rowCountInLastBatch == 0 ||
+ // If either limit is hit, create a batch. This implies that the
limit that is hit
+ // first triggers the creation of a batch even if the other
limit is not yet hit
+ // hence preferring the more restrictive limit.
+ (!isBatchSizeLimitExceeded && !isRecordLimitExceeded))) {
val row = rowIter.next()
arrowWriter.write(row)
estimatedBatchSize += (row match {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 73e7f77993..7d6e5f8fdb 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -164,8 +164,9 @@ object SparkDatasetHelper extends Logging {
KyuubiSparkUtil.globalSparkContext
.getConf
.getOption("spark.connect.grpc.arrow.maxBatchSize")
- .orElse(Option("4m"))
- .map(JavaUtils.byteStringAs(_, ByteUnit.MiB))
+ // 4m
+ .orElse(Option("4194304b"))
+ .map(JavaUtils.byteStringAs(_, ByteUnit.BYTE))
.get
}