This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new baaa62b086d1 [SPARK-55168][PYTHON] Use 
ArrowBatchTransformer.flatten_struct in GroupArrowUDFSerializer
baaa62b086d1 is described below

commit baaa62b086d1fe915f31b60f6d403889a6a10dca
Author: Yicong-Huang <[email protected]>
AuthorDate: Mon Jan 26 14:32:29 2026 +0800

    [SPARK-55168][PYTHON] Use ArrowBatchTransformer.flatten_struct in 
GroupArrowUDFSerializer
    
    ### What changes were proposed in this pull request?
    
    This PR refactors `GroupArrowUDFSerializer.load_stream` to reuse 
`ArrowBatchTransformer.flatten_struct` instead of an inline `process_group` 
function that duplicates the same logic.
    
    Before:
    ```python
    def process_group(batches):
        for batch in batches:
            struct = batch.column(0)
            yield pa.RecordBatch.from_arrays(struct.flatten(), 
schema=pa.schema(struct.type))
    
    batch_iter = process_group(ArrowStreamSerializer.load_stream(self, stream))
    ```
    
    After:
    ```python
    batch_iter = map(
        ArrowBatchTransformer.flatten_struct,
        ArrowStreamSerializer.load_stream(self, stream),
    )
    ```
    
    ### Why are the changes needed?
    
    This is a follow-up to 
[SPARK-55162](https://issues.apache.org/jira/browse/SPARK-55162) and part of 
[SPARK-55159](https://issues.apache.org/jira/browse/SPARK-55159) Phase 1: 
eliminating duplicated transformation logic across serializers.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests in `test_arrow_grouped_map.py`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53951 from 
Yicong-Huang/SPARK-55168/refactor/group-arrow-udf-use-transformer.
    
    Lead-authored-by: Yicong-Huang 
<[email protected]>
    Co-authored-by: Yicong Huang 
<[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/conversion.py         |  4 +++-
 python/pyspark/sql/pandas/serializers.py | 13 +++----------
 2 files changed, 6 insertions(+), 11 deletions(-)

diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index 81ceea857e19..c580899647fa 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -67,7 +67,9 @@ class ArrowBatchTransformer:
         """
         Flatten a single struct column into a RecordBatch.
 
-        Used by: ArrowStreamUDFSerializer.load_stream
+        Used by:
+            - ArrowStreamUDFSerializer.load_stream
+            - GroupArrowUDFSerializer.load_stream
         """
         import pyarrow as pa
 
diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index a8dba37b7da3..b78ef786b2a9 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -1063,18 +1063,11 @@ class 
GroupArrowUDFSerializer(ArrowStreamGroupUDFSerializer):
         """
         Flatten the struct into Arrow's record batches.
         """
-        import pyarrow as pa
-
-        def process_group(batches: "Iterator[pa.RecordBatch]"):
-            for batch in batches:
-                struct = batch.column(0)
-                yield pa.RecordBatch.from_arrays(struct.flatten(), 
schema=pa.schema(struct.type))
-
         for (batches,) in self._load_group_dataframes(stream, num_dfs=1):
-            processed_iter = process_group(batches)
-            yield processed_iter
+            batch_iter = map(ArrowBatchTransformer.flatten_struct, batches)
+            yield batch_iter
             # Make sure the batches are fully iterated before getting the next 
group
-            for _ in processed_iter:
+            for _ in batch_iter:
                 pass
 
     def __repr__(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to