This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 19a1da938a61 [SPARK-54639][PYTHON] Avoid unnecessary Table creation in
Arrow serializers
19a1da938a61 is described below
commit 19a1da938a61bfaffd0ddef35e66a4c41f80d387
Author: Yicong-Huang <[email protected]>
AuthorDate: Thu Dec 11 08:58:56 2025 +0900
[SPARK-54639][PYTHON] Avoid unnecessary Table creation in Arrow serializers
### What changes were proposed in this pull request?
Optimize Arrow serializers (`ArrowStreamPandasSerializer` and
`GroupPandasUDFSerializer`) by avoiding unnecessary `pa.Table` creation when
processing single `RecordBatch` instances.
The optimization replaces `pa.Table.from_batches([batch]).itercolumns()`
with direct column access using `batch.column(i)` for single batches. This
eliminates unnecessary Table and iterator object creation, reducing function
call overhead and GC pressure.
**Changes:**
- `ArrowStreamPandasSerializer.load_stream()`: Direct column access instead
of creating Table wrapper
- `GroupPandasUDFSerializer.load_stream()`: Direct column access for each
batch
**Code example:**
```python
# Before (ArrowStreamPandasSerializer.load_stream)
for batch in batches:
pandas_batches = [
self.arrow_to_pandas(c, i)
for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns())
]
# After
for batch in batches:
pandas_batches = [
self.arrow_to_pandas(batch.column(i), i)
for i in range(batch.num_columns)
]
```
### Why are the changes needed?
Several serializers in `pyspark.sql.pandas.serializers` unnecessarily
create `pa.Table` objects when processing single `RecordBatch` instances. When
converting Arrow RecordBatches to pandas Series, the code creates a `pa.Table`
wrapper for each batch just to iterate over columns, which introduces:
- Unnecessary object creation (Table objects and iterators)
- Extra function call overhead
- Increased GC pressure
For a workload processing 1000 batches with 10 columns each, this avoids
creating 2000 temporary objects (1000 Table objects + 1000 iterators).
`RecordBatch.column(i)` directly returns the column array reference
(zero-copy), reducing function call overhead.
### Does this PR introduce _any_ user-facing change?
No. This is a performance optimization that maintains backward
compatibility. The serialization behavior remains the same, only the internal
implementation is optimized.
### How was this patch tested?
Existing tests pass without modification.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53387 from Yicong-Huang/SPARK-54639/feat/optimize-arrow-serializers.
Authored-by: Yicong-Huang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/pandas/serializers.py | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index 43a42d7fc3b4..f7597c3bdafa 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -500,13 +500,11 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
Deserialize ArrowRecordBatches to an Arrow table and return as a list
of pandas.Series.
"""
batches = super().load_stream(stream)
- import pyarrow as pa
import pandas as pd
for batch in batches:
pandas_batches = [
- self.arrow_to_pandas(c, i)
- for i, c in
enumerate(pa.Table.from_batches([batch]).itercolumns())
+ self.arrow_to_pandas(batch.column(i), i) for i in
range(batch.num_columns)
]
if len(pandas_batches) == 0:
yield [pd.Series([pyspark._NoValue] * batch.num_rows)]
@@ -1225,8 +1223,7 @@ class
GroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
for batch in batches:
# The batch from ArrowStreamSerializer is already flattened
(no struct wrapper)
series = [
- self.arrow_to_pandas(c, i)
- for i, c in
enumerate(pa.Table.from_batches([batch]).itercolumns())
+ self.arrow_to_pandas(batch.column(i), i) for i in
range(batch.num_columns)
]
yield series
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]