Yicong Huang created SPARK-55159:
------------------------------------

             Summary: Extract Arrow batch transformers from serializers for 
better composability
                 Key: SPARK-55159
                 URL: https://issues.apache.org/jira/browse/SPARK-55159
             Project: Spark
          Issue Type: Improvement
          Components: PySpark
    Affects Versions: 4.2.0
            Reporter: Yicong Huang


Currently, PySpark's Arrow serializers (e.g., \{{ArrowStreamUDFSerializer}}, 
\{{ArrowStreamPandasSerializer}}) mix two concerns:

1. *Serialization*: Reading/writing Arrow IPC streams
2. *Data transformation*: Flattening structs, wrapping columns, converting to 
pandas, etc.

For example, \{{ArrowStreamUDFSerializer}} does both in one class:

{code:python}
class ArrowStreamUDFSerializer(ArrowStreamSerializer):
    def load_stream(self, stream):
        batches = super().load_stream(stream)  # serialization
        for batch in batches:
            struct = batch.column(0)
            yield [pa.RecordBatch.from_arrays(struct.flatten(), ...)]  # 
transformation

    def dump_stream(self, iterator, stream):
        # transformation: wrap into struct
        # serialization: write to stream
        # protocol: write START_ARROW_STREAM marker
{code}

This proposal introduces *Arrow batch transformers* - pure callable classes 
that transform \{{Iterator[RecordBatch] -> Iterator[RecordBatch]}} with no side 
effects:

{code:python}
class FlattenStructTransformer:
    """Iterator[RecordBatch] -> Iterator[RecordBatch]"""
    def __call__(self, batches):
        for batch in batches:
            struct = batch.column(0)
            yield pa.RecordBatch.from_arrays(struct.flatten(), ...)

class WrapStructTransformer:
    """Iterator[RecordBatch] -> Iterator[RecordBatch]"""
    def __call__(self, batches):
        for batch in batches:
            struct = pa.StructArray.from_arrays(batch.columns, ...)
            yield pa.RecordBatch.from_arrays([struct], ["_0"])
{code}

Serializers then compose these transformers:

{code:python}
class ArrowStreamUDFSerializer(ArrowStreamSerializer):
    def __init__(self):
        self._flatten = FlattenStructTransformer()
        self._wrap = WrapStructTransformer()

    def load_stream(self, stream):
        batches = super().load_stream(stream)
        return self._flatten(batches)

    def dump_stream(self, iterator, stream):
        wrapped = self._wrap(iterator)
        return super().dump_stream(wrapped, stream)
{code}

*Benefits:*
- Clear separation of concerns (serialization vs transformation)
- Transformers are reusable and testable in isolation
- Easier to understand data flow as a pipeline
- Transformers have no side effects (I/O stays in serializers)

*Design principles:*
- Transformers: \{{Iterator -> Iterator}}, pure, no side effects
- Serializers: Handle I/O, protocol details (e.g., START_ARROW_STREAM marker)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to