This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0caa27ba67ce [SPARK-55169][PYTHON] Use 
ArrowBatchTransformer.flatten_struct in ArrowStreamArrowUDTFSerializer
0caa27ba67ce is described below

commit 0caa27ba67ce4bdd14ecfbe7cbbdc7fabc489b99
Author: Yicong-Huang <[email protected]>
AuthorDate: Tue Jan 27 15:07:32 2026 +0800

    [SPARK-55169][PYTHON] Use ArrowBatchTransformer.flatten_struct in 
ArrowStreamArrowUDTFSerializer
    
    ### What changes were proposed in this pull request?
    
    This PR adds a `column_index` parameter to 
`ArrowBatchTransformer.flatten_struct` and refactors 
`ArrowStreamArrowUDTFSerializer.load_stream` to reuse it.
    
    ### Why are the changes needed?
    
    This is a follow-up to 
[SPARK-55168](https://issues.apache.org/jira/browse/SPARK-55168) and part of 
[SPARK-55159](https://issues.apache.org/jira/browse/SPARK-55159) Phase 1: 
eliminating duplicated transformation logic across serializers.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests in `test_arrow_udtf.py`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53952 from 
Yicong-Huang/SPARK-55169/refactor/arrow-udtf-use-flatten-struct.
    
    Lead-authored-by: Yicong-Huang 
<[email protected]>
    Co-authored-by: Yicong Huang 
<[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/conversion.py         |  7 ++++---
 python/pyspark/sql/pandas/serializers.py | 26 +++++++++-----------------
 2 files changed, 13 insertions(+), 20 deletions(-)

diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index c580899647fa..31c43ddd2797 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -63,17 +63,18 @@ class ArrowBatchTransformer:
     """
 
     @staticmethod
-    def flatten_struct(batch: "pa.RecordBatch") -> "pa.RecordBatch":
+    def flatten_struct(batch: "pa.RecordBatch", column_index: int = 0) -> 
"pa.RecordBatch":
         """
-        Flatten a single struct column into a RecordBatch.
+        Flatten a struct column at given index into a RecordBatch.
 
         Used by:
             - ArrowStreamUDFSerializer.load_stream
             - GroupArrowUDFSerializer.load_stream
+            - ArrowStreamArrowUDTFSerializer.load_stream
         """
         import pyarrow as pa
 
-        struct = batch.column(0)
+        struct = batch.column(column_index)
         return pa.RecordBatch.from_arrays(struct.flatten(), 
schema=pa.schema(struct.type))
 
     @staticmethod
diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index b78ef786b2a9..2d8601473fc3 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -237,23 +237,15 @@ class 
ArrowStreamArrowUDTFSerializer(ArrowStreamUDTFSerializer):
         """
         Flatten the struct into Arrow's record batches.
         """
-        import pyarrow as pa
-
-        batches = super().load_stream(stream)
-        for batch in batches:
-            result_batches = []
-            for i in range(batch.num_columns):
-                if i in self.table_arg_offsets:
-                    struct = batch.column(i)
-                    # Flatten the struct and create a RecordBatch from it
-                    flattened_batch = pa.RecordBatch.from_arrays(
-                        struct.flatten(), schema=pa.schema(struct.type)
-                    )
-                    result_batches.append(flattened_batch)
-                else:
-                    # Keep the column as it is for non-table columns
-                    result_batches.append(batch.column(i))
-            yield result_batches
+        for batch in super().load_stream(stream):
+            # For each column: flatten struct columns at table_arg_offsets 
into RecordBatch,
+            # keep other columns as Array
+            yield [
+                ArrowBatchTransformer.flatten_struct(batch, column_index=i)
+                if i in self.table_arg_offsets
+                else batch.column(i)
+                for i in range(batch.num_columns)
+            ]
 
     def _create_array(self, arr, arrow_type):
         import pyarrow as pa


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to