Yicong Huang created SPARK-55175:
------------------------------------
Summary: Extract arrow_to_pandas transformer from
ArrowStreamPandasSerializer
Key: SPARK-55175
URL: https://issues.apache.org/jira/browse/SPARK-55175
Project: Spark
Issue Type: Sub-task
Components: PySpark
Affects Versions: 4.2.0
Reporter: Yicong Huang
{{ArrowStreamPandasSerializer.arrow_to_pandas}} and its override in
{{ArrowStreamPandasUDFSerializer}} mix conversion logic with serializer
instance state. This pattern is used throughout many serializers:
{code:python}
# Current usage in ArrowStreamPandasSerializer.load_stream (line 470-477)
for batch in batches:
pandas_batches = [
self.arrow_to_pandas(batch.column(i), i) for i in
range(batch.num_columns)
]
yield pandas_batches
# Similar patterns appear in:
# - GroupArrowUDFSerializer.load_stream (line 1109)
# - ArrowStreamPandasUDFSerializer.load_stream (line 1157-1160)
# - CogroupArrowUDFSerializer.load_stream (line 1247-1255)
# - ApplyInPandasWithStateSerializer._load_stream (line 1420-1455)
# - TransformWithStateInPandasSerializer (line 1734-1738, 1864-1875)
{code}
This transformation could be extracted into {{ArrowBatchTransformer}} as a
higher-order function that returns a batch-level transformer
This enables cleaner pipeline-style usage:
{code:python}
# Proposed usage
to_pandas = ArrowBatchTransformer.to_pandas(timezone=self._timezone)
yield from map(to_pandas, batches)
{code}
This continues Phase 1 of SPARK-55159 by extracting transformation logic from
serializers into reusable, testable components.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]