This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0caa27ba67ce [SPARK-55169][PYTHON] Use
ArrowBatchTransformer.flatten_struct in ArrowStreamArrowUDTFSerializer
0caa27ba67ce is described below
commit 0caa27ba67ce4bdd14ecfbe7cbbdc7fabc489b99
Author: Yicong-Huang <[email protected]>
AuthorDate: Tue Jan 27 15:07:32 2026 +0800
[SPARK-55169][PYTHON] Use ArrowBatchTransformer.flatten_struct in
ArrowStreamArrowUDTFSerializer
### What changes were proposed in this pull request?
This PR adds a `column_index` parameter to
`ArrowBatchTransformer.flatten_struct` and refactors
`ArrowStreamArrowUDTFSerializer.load_stream` to reuse it.
### Why are the changes needed?
This is a follow-up to
[SPARK-55168](https://issues.apache.org/jira/browse/SPARK-55168) and part of
[SPARK-55159](https://issues.apache.org/jira/browse/SPARK-55159) Phase 1:
eliminating duplicated transformation logic across serializers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests in `test_arrow_udtf.py`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53952 from
Yicong-Huang/SPARK-55169/refactor/arrow-udtf-use-flatten-struct.
Lead-authored-by: Yicong-Huang
<[email protected]>
Co-authored-by: Yicong Huang
<[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/conversion.py | 7 ++++---
python/pyspark/sql/pandas/serializers.py | 26 +++++++++-----------------
2 files changed, 13 insertions(+), 20 deletions(-)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index c580899647fa..31c43ddd2797 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -63,17 +63,18 @@ class ArrowBatchTransformer:
"""
@staticmethod
- def flatten_struct(batch: "pa.RecordBatch") -> "pa.RecordBatch":
+ def flatten_struct(batch: "pa.RecordBatch", column_index: int = 0) ->
"pa.RecordBatch":
"""
- Flatten a single struct column into a RecordBatch.
+ Flatten a struct column at given index into a RecordBatch.
Used by:
- ArrowStreamUDFSerializer.load_stream
- GroupArrowUDFSerializer.load_stream
+ - ArrowStreamArrowUDTFSerializer.load_stream
"""
import pyarrow as pa
- struct = batch.column(0)
+ struct = batch.column(column_index)
return pa.RecordBatch.from_arrays(struct.flatten(),
schema=pa.schema(struct.type))
@staticmethod
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index b78ef786b2a9..2d8601473fc3 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -237,23 +237,15 @@ class
ArrowStreamArrowUDTFSerializer(ArrowStreamUDTFSerializer):
"""
Flatten the struct into Arrow's record batches.
"""
- import pyarrow as pa
-
- batches = super().load_stream(stream)
- for batch in batches:
- result_batches = []
- for i in range(batch.num_columns):
- if i in self.table_arg_offsets:
- struct = batch.column(i)
- # Flatten the struct and create a RecordBatch from it
- flattened_batch = pa.RecordBatch.from_arrays(
- struct.flatten(), schema=pa.schema(struct.type)
- )
- result_batches.append(flattened_batch)
- else:
- # Keep the column as it is for non-table columns
- result_batches.append(batch.column(i))
- yield result_batches
+ for batch in super().load_stream(stream):
+ # For each column: flatten struct columns at table_arg_offsets
into RecordBatch,
+ # keep other columns as Array
+ yield [
+ ArrowBatchTransformer.flatten_struct(batch, column_index=i)
+ if i in self.table_arg_offsets
+ else batch.column(i)
+ for i in range(batch.num_columns)
+ ]
def _create_array(self, arr, arrow_type):
import pyarrow as pa
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]