[ 
https://issues.apache.org/jira/browse/SPARK-55459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-55459:
-----------------------------------
    Labels: pull-request-available  (was: )

> Fix 3x performance regression in applyInPandas for large groups
> ---------------------------------------------------------------
>
>                 Key: SPARK-55459
>                 URL: https://issues.apache.org/jira/browse/SPARK-55459
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.2.0
>            Reporter: Yicong Huang
>            Priority: Major
>              Labels: pull-request-available
>
>   Type: Bug
>   Description:
>   After SPARK-54316 consolidated GroupPandasIterUDFSerializer with 
> GroupPandasUDFSerializer, applyInPandas performance degraded significantly 
> for workloads with large groups and few columns. Benchmarks show 73% 
> regression (4.38s
>    -> 7.57s) in production and 3x slowdown in unit tests.
>   The root cause is the double pandas concat pattern introduced in 
> wrap_grouped_map_pandas_udf:
>   {code:python}
>   # Current implementation (SLOW)
>   value_dataframes = []
>   for value_series in value_batches:
>       value_dataframes.append(pd.concat(value_series, axis=1))  # First concat
>   value_df = pd.concat(value_dataframes, axis=0)  # Second concat - expensive 
> for large groups!
>   {code}
>   For large groups (millions of rows), the second concat across hundreds of 
> DataFrames becomes extremely expensive. The fix is to collect Series by 
> column and concat once per column instead:
>   {code:python}
>   # Optimized implementation (FAST)
>   all_series_by_col = {}
>   for value_series in value_batches:
>       for col_idx, series in enumerate(value_series):
>           if col_idx not in all_series_by_col:
>               all_series_by_col[col_idx] = []
>           all_series_by_col[col_idx].append(series)
>   # Single concat per column
>   columns = {}
>   for col_idx, series_list in all_series_by_col.items():
>       col_name = series_list[0].name if hasattr(series_list[0], 'name') else 
> f"col{col_idx}"
>       columns[col_name] = pd.concat(series_list, ignore_index=True)
>   value_df = pd.DataFrame(columns)
>   {code}
>   Benchmark results (5M rows, 3 columns):
>   - Before: 0.226s
>   - After: 0.075s
>   - Improvement: 3x faster, 25% less memory
>   The issue became more visible after pandas 2.2.0 upgrade (SPARK-50711) 
> where concat(axis=0) performance characteristics changed for large datasets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to