This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new df195ac59de5 [SPARK-55802][SQL] Fix integer overflow when computing 
Arrow batch bytes
df195ac59de5 is described below

commit df195ac59de5bd896cd70699cfe96ebf78bf2976
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Mar 4 08:21:38 2026 -0800

    [SPARK-55802][SQL] Fix integer overflow when computing Arrow batch bytes
    
    ### What changes were proposed in this pull request?
    
    `ArrowWriter.sizeInBytes()` and `SliceBytesArrowOutputProcessorImpl 
.getBatchBytes()` both accumulated per-column buffer sizes (each an `Int`) into 
an `Int` accumulator. When the total exceeds 2 GB the sum silently wraps 
negative, causing the byte-limit checks controlled by 
`spark.sql.execution.arrow.maxBytesPerBatch` and
    `spark.sql.execution.arrow.maxBytesPerOutputBatch` to behave incorrectly 
and potentially allow oversized batches through.
    
    Fix by changing both accumulators and return types to `Long`.
    
    ### Why are the changes needed?
    
    Fix possible overflow when calculating Arrow batch bytes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Sonnet 4.6 <noreplyanthropic.com>
    
    Closes #54584 from viirya/fix-arrow-batch-bytes-overflow.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 .../main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala | 4 ++--
 .../org/apache/spark/sql/execution/python/PythonArrowOutput.scala     | 4 ++--
 .../execution/python/streaming/BaseStreamingArrowWriterSuite.scala    | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
index 8d68e74dbf87..b5269da035f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
@@ -124,9 +124,9 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: 
Array[ArrowFieldWriter]) {
     count += 1
   }
 
-  def sizeInBytes(): Int = {
+  def sizeInBytes(): Long = {
     var i = 0
-    var bytes = 0
+    var bytes = 0L
     while (i < fields.size) {
       bytes += fields(i).getSizeInBytes()
       i += 1
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
index 38fd081a104f..bae1a2aa0d5d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
@@ -297,8 +297,8 @@ class SliceBytesArrowOutputProcessorImpl(
     }
   }
 
-  private def getBatchBytes(root: VectorSchemaRoot): Int = {
-    var batchBytes = 0
+  private def getBatchBytes(root: VectorSchemaRoot): Long = {
+    var batchBytes = 0L
     root.getFieldVectors.asScala.foreach { vector =>
       batchBytes += vector.getBufferSize
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
index aa6aca507624..bbd2420c588f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
@@ -75,7 +75,7 @@ class BaseStreamingArrowWriterSuite extends SparkFunSuite {
       ()
     }
 
-    when(arrowWriter.sizeInBytes()).thenAnswer { _ => sizeCounter }
+    when(arrowWriter.sizeInBytes()).thenAnswer { _ => sizeCounter.toLong }
 
     // Set arrowMaxBytesPerBatch to 1
     transformWithStateInPySparkWriter = new BaseStreamingArrowWriter(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to