This is an automated email from the ASF dual-hosted git repository.
zhengruifeng pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new a6fe45dcdca6 [SPARK-56084][PYTHON] Remove unused
ArrowStreamGroupUDFSerializer
a6fe45dcdca6 is described below
commit a6fe45dcdca6fa32326688e39110840b0f566cae
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 19 09:20:18 2026 +0800
[SPARK-56084][PYTHON] Remove unused ArrowStreamGroupUDFSerializer
### What changes were proposed in this pull request?
Delete `ArrowStreamGroupUDFSerializer` from
`python/pyspark/sql/pandas/serializers.py`.
### Why are the changes needed?
`ArrowStreamGroupUDFSerializer` is no longer used after SPARK-55608
refactored `SQL_GROUPED_MAP_ARROW_UDF` / `SQL_GROUPED_MAP_ARROW_ITER_UDF` to
use `ArrowStreamGroupSerializer` directly, and SPARK-56860 removed its only
subclass `CogroupArrowUDFSerializer`. This class can be safely deleted.
Part of SPARK-55384.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests: `pyspark.sql.tests.arrow.test_arrow_grouped_map`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #55963 from
Yicong-Huang/SPARK-56084/cleanup/grouped-arrow-udf-serializer.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 164dbfb88accdff9c4f5404e75e5c8049c08b6bc)
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/pandas/serializers.py | 62 --------------------------------
1 file changed, 62 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index 55d874aaa506..bcd40722636a 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -298,68 +298,6 @@ class
ArrowStreamArrowUDTFSerializer(ArrowStreamUDTFSerializer):
return super().dump_stream(apply_type_coercion(), stream)
-class ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
- """
- Serializer for grouped Arrow UDFs.
-
- Deserializes:
- ``Iterator[Iterator[pa.RecordBatch]]`` - one inner iterator per group.
- Each batch contains a single struct column.
-
- Serializes:
- ``Iterator[Tuple[Iterator[pa.RecordBatch], pa.DataType]]``
- Each tuple contains iterator of flattened batches and their Arrow type.
-
- Used by:
- - SQL_GROUPED_MAP_ARROW_UDF
- - SQL_GROUPED_MAP_ARROW_ITER_UDF
-
- Parameters
- ----------
- assign_cols_by_name : bool
- If True, reorder serialized columns by schema name.
- """
-
- def __init__(self, *, assign_cols_by_name):
- super().__init__()
- self._assign_cols_by_name = assign_cols_by_name
-
- def load_stream(self, stream):
- """
- Load grouped Arrow record batches from stream.
- """
- for batches in ArrowStreamGroupSerializer.load_stream(self, stream):
- yield batches
- # Make sure the batches are fully iterated before getting the next
group
- for _ in batches:
- pass
-
- def dump_stream(self, iterator, stream):
- import pyarrow as pa
-
- # flatten inner list [([pa.RecordBatch], arrow_type)] into
[(pa.RecordBatch, arrow_type)]
- # so strip off inner iterator induced by
ArrowStreamUDFSerializer.load_stream
- batch_iter = (
- (batch, arrow_type)
- for batches, arrow_type in iterator # tuple constructed in
wrap_grouped_map_arrow_udf
- for batch in batches
- )
-
- if self._assign_cols_by_name:
- batch_iter = (
- (
- pa.RecordBatch.from_arrays(
- [batch.column(field.name) for field in arrow_type],
- names=[field.name for field in arrow_type],
- ),
- arrow_type,
- )
- for batch, arrow_type in batch_iter
- )
-
- super().dump_stream(batch_iter, stream)
-
-
class ArrowStreamPandasSerializer(ArrowStreamSerializer):
"""
Serializes pandas.Series as Arrow data with Arrow streaming format.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]