This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 238efa134ceb [SPARK-55459][PYTHON] Fix 3x performance regression in 
applyInPandas for large groups
238efa134ceb is described below

commit 238efa134cebccc143e3558087225653ad7934e0
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Feb 10 10:45:26 2026 +0800

    [SPARK-55459][PYTHON] Fix 3x performance regression in applyInPandas for 
large groups
    
    ### What changes were proposed in this pull request?
    
    This PR optimizes the `wrap_grouped_map_pandas_udf` function in 
`python/pyspark/worker.py` to fix a 3x performance regression in 
`applyInPandas` for workloads with large groups and few columns.
    
    The optimization changes the double concat pattern (concat within batch + 
concat across batches) to a single concat per column approach, avoiding the 
expensive `pd.concat(axis=0)` across hundreds of intermediate DataFrames.
    
    ### Why are the changes needed?
    
    After SPARK-54316 consolidated `GroupPandasIterUDFSerializer` with 
`GroupPandasUDFSerializer`, the double concat pattern was introduced:
    1. First concat: `pd.concat(value_series, axis=1)` for each batch
    2. Second concat: `pd.concat(value_dataframes, axis=0)` across all batches
    
    For large groups (millions of rows), the second concat becomes extremely 
expensive, causing 73% performance regression (4.38s → 7.57s) in production 
workloads and 3x slowdown in benchmarks.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a performance optimization with no API or behavior changes.
    
    ### How was this patch tested?
    
    Unit tests with synthetic data (5M rows, 3 columns, 500 batches):
    - Before: 0.226s
    - After: 0.075s
    - Improvement: 3x faster, 25% less memory
    
    Existing PySpark tests pass without modification, confirming functional 
correctness.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54239 from Yicong-Huang/SPARK-55459/fix/applyInPandas-performance.
    
    Lead-authored-by: Yicong Huang 
<[email protected]>
    Co-authored-by: Yicong-Huang 
<[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/worker.py | 29 +++++++++++++++++++++++------
 1 file changed, 23 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 59d4434ab815..ef2e830216ca 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -812,13 +812,30 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
         import pandas as pd
 
         # Convert value_batches (Iterator[list[pd.Series]]) to a single 
DataFrame
-        # Each value_series is a list of Series (one per column) for one batch
-        # Concatenate Series within each batch (axis=1), then concatenate 
batches (axis=0)
-        value_dataframes = []
-        for value_series in value_batches:
-            value_dataframes.append(pd.concat(value_series, axis=1))
+        # Optimized: Collect all Series by column, then concat once per column
+        # This avoids the expensive pd.concat(axis=0) across many DataFrames
+        all_series_by_col = {}
 
-        value_df = pd.concat(value_dataframes, axis=0) if value_dataframes 
else pd.DataFrame()
+        for value_series in value_batches:
+            for col_idx, series in enumerate(value_series):
+                if col_idx not in all_series_by_col:
+                    all_series_by_col[col_idx] = []
+                all_series_by_col[col_idx].append(series)
+
+        # Concatenate each column separately (single concat per column)
+        if all_series_by_col:
+            columns = {}
+            for col_idx, series_list in all_series_by_col.items():
+                # Use the original series name if available
+                col_name = (
+                    series_list[0].name
+                    if hasattr(series_list[0], "name") and series_list[0].name
+                    else f"col{col_idx}"
+                )
+                columns[col_name] = pd.concat(series_list, ignore_index=True)
+            value_df = pd.DataFrame(columns)
+        else:
+            value_df = pd.DataFrame()
 
         if len(argspec.args) == 1:
             result = f(value_df)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to