This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new baaa62b086d1 [SPARK-55168][PYTHON] Use
ArrowBatchTransformer.flatten_struct in GroupArrowUDFSerializer
baaa62b086d1 is described below
commit baaa62b086d1fe915f31b60f6d403889a6a10dca
Author: Yicong-Huang <[email protected]>
AuthorDate: Mon Jan 26 14:32:29 2026 +0800
[SPARK-55168][PYTHON] Use ArrowBatchTransformer.flatten_struct in
GroupArrowUDFSerializer
### What changes were proposed in this pull request?
This PR refactors `GroupArrowUDFSerializer.load_stream` to reuse
`ArrowBatchTransformer.flatten_struct` instead of an inline `process_group`
function that duplicates the same logic.
Before:
```python
def process_group(batches):
for batch in batches:
struct = batch.column(0)
yield pa.RecordBatch.from_arrays(struct.flatten(),
schema=pa.schema(struct.type))
batch_iter = process_group(ArrowStreamSerializer.load_stream(self, stream))
```
After:
```python
batch_iter = map(
ArrowBatchTransformer.flatten_struct,
ArrowStreamSerializer.load_stream(self, stream),
)
```
### Why are the changes needed?
This is a follow-up to
[SPARK-55162](https://issues.apache.org/jira/browse/SPARK-55162) and part of
[SPARK-55159](https://issues.apache.org/jira/browse/SPARK-55159) Phase 1:
eliminating duplicated transformation logic across serializers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests in `test_arrow_grouped_map.py`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53951 from
Yicong-Huang/SPARK-55168/refactor/group-arrow-udf-use-transformer.
Lead-authored-by: Yicong-Huang
<[email protected]>
Co-authored-by: Yicong Huang
<[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/conversion.py | 4 +++-
python/pyspark/sql/pandas/serializers.py | 13 +++----------
2 files changed, 6 insertions(+), 11 deletions(-)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index 81ceea857e19..c580899647fa 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -67,7 +67,9 @@ class ArrowBatchTransformer:
"""
Flatten a single struct column into a RecordBatch.
- Used by: ArrowStreamUDFSerializer.load_stream
+ Used by:
+ - ArrowStreamUDFSerializer.load_stream
+ - GroupArrowUDFSerializer.load_stream
"""
import pyarrow as pa
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index a8dba37b7da3..b78ef786b2a9 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -1063,18 +1063,11 @@ class
GroupArrowUDFSerializer(ArrowStreamGroupUDFSerializer):
"""
Flatten the struct into Arrow's record batches.
"""
- import pyarrow as pa
-
- def process_group(batches: "Iterator[pa.RecordBatch]"):
- for batch in batches:
- struct = batch.column(0)
- yield pa.RecordBatch.from_arrays(struct.flatten(),
schema=pa.schema(struct.type))
-
for (batches,) in self._load_group_dataframes(stream, num_dfs=1):
- processed_iter = process_group(batches)
- yield processed_iter
+ batch_iter = map(ArrowBatchTransformer.flatten_struct, batches)
+ yield batch_iter
# Make sure the batches are fully iterated before getting the next
group
- for _ in processed_iter:
+ for _ in batch_iter:
pass
def __repr__(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]