Yicong Huang created SPARK-55175:
------------------------------------

             Summary: Extract arrow_to_pandas transformer from 
ArrowStreamPandasSerializer
                 Key: SPARK-55175
                 URL: https://issues.apache.org/jira/browse/SPARK-55175
             Project: Spark
          Issue Type: Sub-task
          Components: PySpark
    Affects Versions: 4.2.0
            Reporter: Yicong Huang


{{ArrowStreamPandasSerializer.arrow_to_pandas}} and its override in 
{{ArrowStreamPandasUDFSerializer}} mix conversion logic with serializer 
instance state. This pattern is used throughout many serializers:

{code:python}
# Current usage in ArrowStreamPandasSerializer.load_stream (line 470-477)
for batch in batches:
    pandas_batches = [
        self.arrow_to_pandas(batch.column(i), i) for i in 
range(batch.num_columns)
    ]
    yield pandas_batches

# Similar patterns appear in:
# - GroupArrowUDFSerializer.load_stream (line 1109)
# - ArrowStreamPandasUDFSerializer.load_stream (line 1157-1160)  
# - CogroupArrowUDFSerializer.load_stream (line 1247-1255)
# - ApplyInPandasWithStateSerializer._load_stream (line 1420-1455)
# - TransformWithStateInPandasSerializer (line 1734-1738, 1864-1875)
{code}

This transformation could be extracted into {{ArrowBatchTransformer}} as a 
higher-order function that returns a batch-level transformer

This enables cleaner pipeline-style usage:

{code:python}
# Proposed usage
to_pandas = ArrowBatchTransformer.to_pandas(timezone=self._timezone)
yield from map(to_pandas, batches)
{code}

This continues Phase 1 of SPARK-55159 by extracting transformation logic from 
serializers into reusable, testable components.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to