[ 
https://issues.apache.org/jira/browse/SPARK-55159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yicong Huang updated SPARK-55159:
---------------------------------
    Issue Type: Umbrella  (was: Improvement)

> Extract Arrow batch transformers from serializers for better composability
> --------------------------------------------------------------------------
>
>                 Key: SPARK-55159
>                 URL: https://issues.apache.org/jira/browse/SPARK-55159
>             Project: Spark
>          Issue Type: Umbrella
>          Components: PySpark
>    Affects Versions: 4.2.0
>            Reporter: Yicong Huang
>            Priority: Major
>
> Currently, PySpark's Arrow serializers (e.g., \{{ArrowStreamUDFSerializer}}, 
> \{{ArrowStreamPandasSerializer}}) mix two concerns:
> 1. *Serialization*: Reading/writing Arrow IPC streams
> 2. *Data transformation*: Flattening structs, wrapping columns, converting to 
> pandas, etc.
> For example, \{{ArrowStreamUDFSerializer}} does both in one class:
> {code:python}
> class ArrowStreamUDFSerializer(ArrowStreamSerializer):
>     def load_stream(self, stream):
>         batches = super().load_stream(stream)  # serialization
>         for batch in batches:
>             struct = batch.column(0)
>             yield [pa.RecordBatch.from_arrays(struct.flatten(), ...)]  # 
> transformation
>     def dump_stream(self, iterator, stream):
>         # transformation: wrap into struct
>         # serialization: write to stream
>         # protocol: write START_ARROW_STREAM marker
> {code}
> This proposal introduces *Arrow batch transformers* - pure callable classes 
> that transform \{{Iterator[RecordBatch] -> Iterator[RecordBatch]}} with no 
> side effects:
> {code:python}
> class FlattenStructTransformer:
>     """Iterator[RecordBatch] -> Iterator[RecordBatch]"""
>     def __call__(self, batches):
>         for batch in batches:
>             struct = batch.column(0)
>             yield pa.RecordBatch.from_arrays(struct.flatten(), ...)
> class WrapStructTransformer:
>     """Iterator[RecordBatch] -> Iterator[RecordBatch]"""
>     def __call__(self, batches):
>         for batch in batches:
>             struct = pa.StructArray.from_arrays(batch.columns, ...)
>             yield pa.RecordBatch.from_arrays([struct], ["_0"])
> {code}
> Serializers then compose these transformers:
> {code:python}
> class ArrowStreamUDFSerializer(ArrowStreamSerializer):
>     def __init__(self):
>         self._flatten = FlattenStructTransformer()
>         self._wrap = WrapStructTransformer()
>     def load_stream(self, stream):
>         batches = super().load_stream(stream)
>         return self._flatten(batches)
>     def dump_stream(self, iterator, stream):
>         wrapped = self._wrap(iterator)
>         return super().dump_stream(wrapped, stream)
> {code}
> *Benefits:*
> - Clear separation of concerns (serialization vs transformation)
> - Transformers are reusable and testable in isolation
> - Easier to understand data flow as a pipeline
> - Transformers have no side effects (I/O stays in serializers)
> *Design principles:*
> - Transformers: \{{Iterator -> Iterator}}, pure, no side effects
> - Serializers: Handle I/O, protocol details (e.g., START_ARROW_STREAM marker)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to