This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 19a1da938a61 [SPARK-54639][PYTHON] Avoid unnecessary Table creation in 
Arrow serializers
19a1da938a61 is described below

commit 19a1da938a61bfaffd0ddef35e66a4c41f80d387
Author: Yicong-Huang <[email protected]>
AuthorDate: Thu Dec 11 08:58:56 2025 +0900

    [SPARK-54639][PYTHON] Avoid unnecessary Table creation in Arrow serializers
    
    ### What changes were proposed in this pull request?
    
    Optimize Arrow serializers (`ArrowStreamPandasSerializer` and 
`GroupPandasUDFSerializer`) by avoiding unnecessary `pa.Table` creation when 
processing single `RecordBatch` instances.
    
    The optimization replaces `pa.Table.from_batches([batch]).itercolumns()` 
with direct column access using `batch.column(i)` for single batches. This 
eliminates unnecessary Table and iterator object creation, reducing function 
call overhead and GC pressure.
    
    **Changes:**
    - `ArrowStreamPandasSerializer.load_stream()`: Direct column access instead 
of creating Table wrapper
    - `GroupPandasUDFSerializer.load_stream()`: Direct column access for each 
batch
    
    **Code example:**
    
    ```python
    # Before (ArrowStreamPandasSerializer.load_stream)
    for batch in batches:
        pandas_batches = [
            self.arrow_to_pandas(c, i)
            for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns())
        ]
    
    # After
    for batch in batches:
        pandas_batches = [
            self.arrow_to_pandas(batch.column(i), i)
            for i in range(batch.num_columns)
        ]
    ```
    
    ### Why are the changes needed?
    
    Several serializers in `pyspark.sql.pandas.serializers` unnecessarily 
create `pa.Table` objects when processing single `RecordBatch` instances. When 
converting Arrow RecordBatches to pandas Series, the code creates a `pa.Table` 
wrapper for each batch just to iterate over columns, which introduces:
    - Unnecessary object creation (Table objects and iterators)
    - Extra function call overhead
    - Increased GC pressure
    
    For a workload processing 1000 batches with 10 columns each, this avoids 
creating 2000 temporary objects (1000 Table objects + 1000 iterators). 
`RecordBatch.column(i)` directly returns the column array reference 
(zero-copy), reducing function call overhead.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a performance optimization that maintains backward 
compatibility. The serialization behavior remains the same, only the internal 
implementation is optimized.
    
    ### How was this patch tested?
    
    Existing tests pass without modification.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53387 from Yicong-Huang/SPARK-54639/feat/optimize-arrow-serializers.
    
    Authored-by: Yicong-Huang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/pandas/serializers.py | 7 ++-----
 1 file changed, 2 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 43a42d7fc3b4..f7597c3bdafa 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -500,13 +500,11 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         Deserialize ArrowRecordBatches to an Arrow table and return as a list 
of pandas.Series.
         """
         batches = super().load_stream(stream)
-        import pyarrow as pa
         import pandas as pd
 
         for batch in batches:
             pandas_batches = [
-                self.arrow_to_pandas(c, i)
-                for i, c in 
enumerate(pa.Table.from_batches([batch]).itercolumns())
+                self.arrow_to_pandas(batch.column(i), i) for i in 
range(batch.num_columns)
             ]
             if len(pandas_batches) == 0:
                 yield [pd.Series([pyspark._NoValue] * batch.num_rows)]
@@ -1225,8 +1223,7 @@ class 
GroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
             for batch in batches:
                 # The batch from ArrowStreamSerializer is already flattened 
(no struct wrapper)
                 series = [
-                    self.arrow_to_pandas(c, i)
-                    for i, c in 
enumerate(pa.Table.from_batches([batch]).itercolumns())
+                    self.arrow_to_pandas(batch.column(i), i) for i in 
range(batch.num_columns)
                 ]
                 yield series
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to