[
https://issues.apache.org/jira/browse/SPARK-54657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18062579#comment-18062579
]
Yicong Huang commented on SPARK-54657:
--------------------------------------
We’re actively working on refactoring serializers, but we’ve decided to do it
surgically and incrementally to avoid introducing breaking changes or
regressions. The plan is to reduce the number of serializers by extracting
transformers and moving data-transformation logic out of serializers.
Closing SPARK-54657 in favor of the following umbrella tickets tracking the
work end-to-end:
• SPARK-55388: https://issues.apache.org/jira/browse/SPARK-55388
• SPARK-55384: https://issues.apache.org/jira/browse/SPARK-55384
> Refactor pyspark.sql.pandas.serializers for improved maintainability
> --------------------------------------------------------------------
>
> Key: SPARK-54657
> URL: https://issues.apache.org/jira/browse/SPARK-54657
> Project: Spark
> Issue Type: Improvement
> Components: PySpark
> Affects Versions: 4.2.0
> Reporter: Yicong Huang
> Priority: Major
> Labels: pull-request-available
>
> The {{serializers.py}} file has grown to ~2200 lines with 25+ serializer
> classes. Many share duplicated patterns that could be consolidated.
> The main issues:
> 1. *Duplicated load_stream patterns* - The "dataframes_in_group" reading loop
> is repeated in 6+ classes:
> {code:python}
> # This pattern appears in GroupArrowUDFSerializer,
> ArrowStreamAggArrowUDFSerializer,
> # ArrowStreamAggPandasUDFSerializer, GroupPandasUDFSerializer,
> CogroupArrowUDFSerializer, etc.
> dataframes_in_group = None
> while dataframes_in_group is None or dataframes_in_group > 0:
> dataframes_in_group = read_int(stream)
> if dataframes_in_group == 1:
> # process batches...
> elif dataframes_in_group != 0:
> raise PySparkValueError(...)
> {code}
> 2. *Duplicated dump_stream patterns* - The START_ARROW_STREAM writing appears
> in 4+ classes:
> {code:python}
> # Repeated in ArrowStreamUDFSerializer, ArrowStreamPandasUDFSerializer,
> # ArrowStreamArrowUDFSerializer, ApplyInPandasWithStateSerializer, etc.
> should_write_start_length = True
> for batch in iterator:
> if should_write_start_length:
> write_int(SpecialLengths.START_ARROW_STREAM, stream)
> should_write_start_length = False
> yield batch
> {code}
> 3. *Cogroup and single group handling are separate* -
> {{GroupArrowUDFSerializer}} and {{CogroupArrowUDFSerializer}} have nearly
> identical logic except one reads 1 dataframe per group, the other reads 2.
> 4. *File is too large* to navigate easily.
> Proposed refactoring:
> - Extract common patterns into mixins ({{GroupedLoadStreamMixin}},
> {{StartArrowStreamDumpMixin}})
> - Unify cogroup/single group handling logic
> - Split into submodules
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]