This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 238efa134ceb [SPARK-55459][PYTHON] Fix 3x performance regression in
applyInPandas for large groups
238efa134ceb is described below
commit 238efa134cebccc143e3558087225653ad7934e0
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Feb 10 10:45:26 2026 +0800
[SPARK-55459][PYTHON] Fix 3x performance regression in applyInPandas for
large groups
### What changes were proposed in this pull request?
This PR optimizes the `wrap_grouped_map_pandas_udf` function in
`python/pyspark/worker.py` to fix a 3x performance regression in
`applyInPandas` for workloads with large groups and few columns.
The optimization changes the double concat pattern (concat within batch +
concat across batches) to a single concat per column approach, avoiding the
expensive `pd.concat(axis=0)` across hundreds of intermediate DataFrames.
### Why are the changes needed?
After SPARK-54316 consolidated `GroupPandasIterUDFSerializer` with
`GroupPandasUDFSerializer`, the double concat pattern was introduced:
1. First concat: `pd.concat(value_series, axis=1)` for each batch
2. Second concat: `pd.concat(value_dataframes, axis=0)` across all batches
For large groups (millions of rows), the second concat becomes extremely
expensive, causing 73% performance regression (4.38s → 7.57s) in production
workloads and 3x slowdown in benchmarks.
### Does this PR introduce _any_ user-facing change?
No. This is a performance optimization with no API or behavior changes.
### How was this patch tested?
Unit tests with synthetic data (5M rows, 3 columns, 500 batches):
- Before: 0.226s
- After: 0.075s
- Improvement: 3x faster, 25% less memory
Existing PySpark tests pass without modification, confirming functional
correctness.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54239 from Yicong-Huang/SPARK-55459/fix/applyInPandas-performance.
Lead-authored-by: Yicong Huang
<[email protected]>
Co-authored-by: Yicong-Huang
<[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/worker.py | 29 +++++++++++++++++++++++------
1 file changed, 23 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 59d4434ab815..ef2e830216ca 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -812,13 +812,30 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec,
runner_conf):
import pandas as pd
# Convert value_batches (Iterator[list[pd.Series]]) to a single
DataFrame
- # Each value_series is a list of Series (one per column) for one batch
- # Concatenate Series within each batch (axis=1), then concatenate
batches (axis=0)
- value_dataframes = []
- for value_series in value_batches:
- value_dataframes.append(pd.concat(value_series, axis=1))
+ # Optimized: Collect all Series by column, then concat once per column
+ # This avoids the expensive pd.concat(axis=0) across many DataFrames
+ all_series_by_col = {}
- value_df = pd.concat(value_dataframes, axis=0) if value_dataframes
else pd.DataFrame()
+ for value_series in value_batches:
+ for col_idx, series in enumerate(value_series):
+ if col_idx not in all_series_by_col:
+ all_series_by_col[col_idx] = []
+ all_series_by_col[col_idx].append(series)
+
+ # Concatenate each column separately (single concat per column)
+ if all_series_by_col:
+ columns = {}
+ for col_idx, series_list in all_series_by_col.items():
+ # Use the original series name if available
+ col_name = (
+ series_list[0].name
+ if hasattr(series_list[0], "name") and series_list[0].name
+ else f"col{col_idx}"
+ )
+ columns[col_name] = pd.concat(series_list, ignore_index=True)
+ value_df = pd.DataFrame(columns)
+ else:
+ value_df = pd.DataFrame()
if len(argspec.args) == 1:
result = f(value_df)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]