Yicong Huang created SPARK-55159:
------------------------------------
Summary: Extract Arrow batch transformers from serializers for
better composability
Key: SPARK-55159
URL: https://issues.apache.org/jira/browse/SPARK-55159
Project: Spark
Issue Type: Improvement
Components: PySpark
Affects Versions: 4.2.0
Reporter: Yicong Huang
Currently, PySpark's Arrow serializers (e.g., \{{ArrowStreamUDFSerializer}},
\{{ArrowStreamPandasSerializer}}) mix two concerns:
1. *Serialization*: Reading/writing Arrow IPC streams
2. *Data transformation*: Flattening structs, wrapping columns, converting to
pandas, etc.
For example, \{{ArrowStreamUDFSerializer}} does both in one class:
{code:python}
class ArrowStreamUDFSerializer(ArrowStreamSerializer):
def load_stream(self, stream):
batches = super().load_stream(stream) # serialization
for batch in batches:
struct = batch.column(0)
yield [pa.RecordBatch.from_arrays(struct.flatten(), ...)] #
transformation
def dump_stream(self, iterator, stream):
# transformation: wrap into struct
# serialization: write to stream
# protocol: write START_ARROW_STREAM marker
{code}
This proposal introduces *Arrow batch transformers* - pure callable classes
that transform \{{Iterator[RecordBatch] -> Iterator[RecordBatch]}} with no side
effects:
{code:python}
class FlattenStructTransformer:
"""Iterator[RecordBatch] -> Iterator[RecordBatch]"""
def __call__(self, batches):
for batch in batches:
struct = batch.column(0)
yield pa.RecordBatch.from_arrays(struct.flatten(), ...)
class WrapStructTransformer:
"""Iterator[RecordBatch] -> Iterator[RecordBatch]"""
def __call__(self, batches):
for batch in batches:
struct = pa.StructArray.from_arrays(batch.columns, ...)
yield pa.RecordBatch.from_arrays([struct], ["_0"])
{code}
Serializers then compose these transformers:
{code:python}
class ArrowStreamUDFSerializer(ArrowStreamSerializer):
def __init__(self):
self._flatten = FlattenStructTransformer()
self._wrap = WrapStructTransformer()
def load_stream(self, stream):
batches = super().load_stream(stream)
return self._flatten(batches)
def dump_stream(self, iterator, stream):
wrapped = self._wrap(iterator)
return super().dump_stream(wrapped, stream)
{code}
*Benefits:*
- Clear separation of concerns (serialization vs transformation)
- Transformers are reusable and testable in isolation
- Easier to understand data flow as a pipeline
- Transformers have no side effects (I/O stays in serializers)
*Design principles:*
- Transformers: \{{Iterator -> Iterator}}, pure, no side effects
- Serializers: Handle I/O, protocol details (e.g., START_ARROW_STREAM marker)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]