[
https://issues.apache.org/jira/browse/SPARK-55459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruifeng Zheng reassigned SPARK-55459:
-------------------------------------
Assignee: Yicong Huang
> Fix 3x performance regression in applyInPandas for large groups
> ---------------------------------------------------------------
>
> Key: SPARK-55459
> URL: https://issues.apache.org/jira/browse/SPARK-55459
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 4.2.0
> Reporter: Yicong Huang
> Assignee: Yicong Huang
> Priority: Major
> Labels: pull-request-available
>
> Type: Bug
> Description:
> After SPARK-54316 consolidated GroupPandasIterUDFSerializer with
> GroupPandasUDFSerializer, applyInPandas performance degraded significantly
> for workloads with large groups and few columns. Benchmarks show 73%
> regression (4.38s
> -> 7.57s) in production and 3x slowdown in unit tests.
> The root cause is the double pandas concat pattern introduced in
> wrap_grouped_map_pandas_udf:
> {code:python}
> # Current implementation (SLOW)
> value_dataframes = []
> for value_series in value_batches:
> value_dataframes.append(pd.concat(value_series, axis=1)) # First concat
> value_df = pd.concat(value_dataframes, axis=0) # Second concat - expensive
> for large groups!
> {code}
> For large groups (millions of rows), the second concat across hundreds of
> DataFrames becomes extremely expensive. The fix is to collect Series by
> column and concat once per column instead:
> {code:python}
> # Optimized implementation (FAST)
> all_series_by_col = {}
> for value_series in value_batches:
> for col_idx, series in enumerate(value_series):
> if col_idx not in all_series_by_col:
> all_series_by_col[col_idx] = []
> all_series_by_col[col_idx].append(series)
> # Single concat per column
> columns = {}
> for col_idx, series_list in all_series_by_col.items():
> col_name = series_list[0].name if hasattr(series_list[0], 'name') else
> f"col{col_idx}"
> columns[col_name] = pd.concat(series_list, ignore_index=True)
> value_df = pd.DataFrame(columns)
> {code}
> Benchmark results (5M rows, 3 columns):
> - Before: 0.226s
> - After: 0.075s
> - Improvement: 3x faster, 25% less memory
> The issue became more visible after pandas 2.2.0 upgrade (SPARK-50711)
> where concat(axis=0) performance characteristics changed for large datasets.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]