[
https://issues.apache.org/jira/browse/SPARK-55159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yicong Huang updated SPARK-55159:
---------------------------------
Description:
Currently, PySpark's Arrow serializers (e.g., {{ArrowStreamUDFSerializer}},
{{ArrowStreamPandasSerializer}}) mix two concerns:
1. *Serialization*: Reading/writing Arrow IPC streams
2. *Data transformation*: Flattening structs, wrapping columns, converting to
pandas, etc.
*Proposed approach (3 phases):*
*Phase 1: Extract transformers to conversion.py*
Extract transformation logic into {{ArrowBatchTransformer}} class with static
methods in {{pyspark.sql.conversion}}. Serializers call these transformers
internally.
{code:python}
class ArrowBatchTransformer:
@staticmethod
def flatten_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
"""Flatten a single struct column into a RecordBatch."""
struct = batch.column(0)
return pa.RecordBatch.from_arrays(struct.flatten(),
schema=pa.schema(struct.type))
@staticmethod
def wrap_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
"""Wrap a RecordBatch's columns into a single struct column."""
if batch.num_columns == 0:
struct = pa.array([{}] * batch.num_rows)
else:
struct = pa.StructArray.from_arrays(batch.columns,
fields=pa.struct(list(batch.schema)))
return pa.RecordBatch.from_arrays([struct], ["_0"])
{code}
Serializers use these via {{map()}}:
{code:python}
class ArrowStreamUDFSerializer(ArrowStreamSerializer):
def load_stream(self, stream):
batches = super().load_stream(stream)
return map(list, map(ArrowBatchTransformer.flatten_struct, batches))
def dump_stream(self, iterator, stream):
batches = map(lambda x: ArrowBatchTransformer.wrap_struct(x[0]),
iterator)
...
{code}
*Phase 2: Reduce serializer complexity*
- Reduce inheritance depth in serializer hierarchy
- Simplify serializer implementations using extracted transformers
- Remove duplicated transformation logic across serializers
*Phase 3: Make transformers usable outside serializers*
- Enable direct use of transformers for custom Arrow processing pipelines
- Support chaining transformers for complex transformations
*Benefits:*
- Clear separation of concerns (serialization vs transformation)
- Transformers are reusable and testable in isolation
- Easier to understand data flow as a pipeline
- Transformers have no side effects (I/O stays in serializers)
*Design principles:*
- Transformers: Pure functions {{RecordBatch -> RecordBatch}}, no side effects
- Serializers: Handle I/O, protocol details (e.g., START_ARROW_STREAM marker)
was:
Currently, PySpark's Arrow serializers (e.g., \{{ArrowStreamUDFSerializer}},
\{{ArrowStreamPandasSerializer}}) mix two concerns:
1. *Serialization*: Reading/writing Arrow IPC streams
2. *Data transformation*: Flattening structs, wrapping columns, converting to
pandas, etc.
For example, \{{ArrowStreamUDFSerializer}} does both in one class:
{code:python}
class ArrowStreamUDFSerializer(ArrowStreamSerializer):
def load_stream(self, stream):
batches = super().load_stream(stream) # serialization
for batch in batches:
struct = batch.column(0)
yield [pa.RecordBatch.from_arrays(struct.flatten(), ...)] #
transformation
def dump_stream(self, iterator, stream):
# transformation: wrap into struct
# serialization: write to stream
# protocol: write START_ARROW_STREAM marker
{code}
This proposal introduces *Arrow batch transformers* - pure callable classes
that transform \{{Iterator[RecordBatch] -> Iterator[RecordBatch]}} with no side
effects:
{code:python}
class FlattenStructTransformer:
"""Iterator[RecordBatch] -> Iterator[RecordBatch]"""
def __call__(self, batches):
for batch in batches:
struct = batch.column(0)
yield pa.RecordBatch.from_arrays(struct.flatten(), ...)
class WrapStructTransformer:
"""Iterator[RecordBatch] -> Iterator[RecordBatch]"""
def __call__(self, batches):
for batch in batches:
struct = pa.StructArray.from_arrays(batch.columns, ...)
yield pa.RecordBatch.from_arrays([struct], ["_0"])
{code}
Serializers then compose these transformers:
{code:python}
class ArrowStreamUDFSerializer(ArrowStreamSerializer):
def __init__(self):
self._flatten = FlattenStructTransformer()
self._wrap = WrapStructTransformer()
def load_stream(self, stream):
batches = super().load_stream(stream)
return self._flatten(batches)
def dump_stream(self, iterator, stream):
wrapped = self._wrap(iterator)
return super().dump_stream(wrapped, stream)
{code}
*Benefits:*
- Clear separation of concerns (serialization vs transformation)
- Transformers are reusable and testable in isolation
- Easier to understand data flow as a pipeline
- Transformers have no side effects (I/O stays in serializers)
*Design principles:*
- Transformers: \{{Iterator -> Iterator}}, pure, no side effects
- Serializers: Handle I/O, protocol details (e.g., START_ARROW_STREAM marker)
> Extract Arrow batch transformers from serializers for better composability
> --------------------------------------------------------------------------
>
> Key: SPARK-55159
> URL: https://issues.apache.org/jira/browse/SPARK-55159
> Project: Spark
> Issue Type: Umbrella
> Components: PySpark
> Affects Versions: 4.2.0
> Reporter: Yicong Huang
> Priority: Major
>
> Currently, PySpark's Arrow serializers (e.g., {{ArrowStreamUDFSerializer}},
> {{ArrowStreamPandasSerializer}}) mix two concerns:
> 1. *Serialization*: Reading/writing Arrow IPC streams
> 2. *Data transformation*: Flattening structs, wrapping columns, converting to
> pandas, etc.
> *Proposed approach (3 phases):*
> *Phase 1: Extract transformers to conversion.py*
> Extract transformation logic into {{ArrowBatchTransformer}} class with static
> methods in {{pyspark.sql.conversion}}. Serializers call these transformers
> internally.
> {code:python}
> class ArrowBatchTransformer:
> @staticmethod
> def flatten_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
> """Flatten a single struct column into a RecordBatch."""
> struct = batch.column(0)
> return pa.RecordBatch.from_arrays(struct.flatten(),
> schema=pa.schema(struct.type))
> @staticmethod
> def wrap_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
> """Wrap a RecordBatch's columns into a single struct column."""
> if batch.num_columns == 0:
> struct = pa.array([{}] * batch.num_rows)
> else:
> struct = pa.StructArray.from_arrays(batch.columns,
> fields=pa.struct(list(batch.schema)))
> return pa.RecordBatch.from_arrays([struct], ["_0"])
> {code}
> Serializers use these via {{map()}}:
> {code:python}
> class ArrowStreamUDFSerializer(ArrowStreamSerializer):
> def load_stream(self, stream):
> batches = super().load_stream(stream)
> return map(list, map(ArrowBatchTransformer.flatten_struct, batches))
> def dump_stream(self, iterator, stream):
> batches = map(lambda x: ArrowBatchTransformer.wrap_struct(x[0]),
> iterator)
> ...
> {code}
> *Phase 2: Reduce serializer complexity*
> - Reduce inheritance depth in serializer hierarchy
> - Simplify serializer implementations using extracted transformers
> - Remove duplicated transformation logic across serializers
> *Phase 3: Make transformers usable outside serializers*
> - Enable direct use of transformers for custom Arrow processing pipelines
> - Support chaining transformers for complex transformations
> *Benefits:*
> - Clear separation of concerns (serialization vs transformation)
> - Transformers are reusable and testable in isolation
> - Easier to understand data flow as a pipeline
> - Transformers have no side effects (I/O stays in serializers)
> *Design principles:*
> - Transformers: Pure functions {{RecordBatch -> RecordBatch}}, no side effects
> - Serializers: Handle I/O, protocol details (e.g., START_ARROW_STREAM marker)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]