Yicong Huang created SPARK-55459:
------------------------------------

             Summary: Fix 3x performance regression in applyInPandas for large 
groups
                 Key: SPARK-55459
                 URL: https://issues.apache.org/jira/browse/SPARK-55459
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 4.2.0
            Reporter: Yicong Huang



  Type: Bug

  Description:
  After SPARK-54316 consolidated GroupPandasIterUDFSerializer with 
GroupPandasUDFSerializer, applyInPandas performance degraded significantly for 
workloads with large groups and few columns. Benchmarks show 73% regression 
(4.38s
   -> 7.57s) in production and 3x slowdown in unit tests.

  The root cause is the double pandas concat pattern introduced in 
wrap_grouped_map_pandas_udf:

  {code:python}
  # Current implementation (SLOW)
  value_dataframes = []
  for value_series in value_batches:
      value_dataframes.append(pd.concat(value_series, axis=1))  # First concat

  value_df = pd.concat(value_dataframes, axis=0)  # Second concat - expensive 
for large groups!
  {code}

  For large groups (millions of rows), the second concat across hundreds of 
DataFrames becomes extremely expensive. The fix is to collect Series by column 
and concat once per column instead:

  {code:python}
  # Optimized implementation (FAST)
  all_series_by_col = {}
  for value_series in value_batches:
      for col_idx, series in enumerate(value_series):
          if col_idx not in all_series_by_col:
              all_series_by_col[col_idx] = []
          all_series_by_col[col_idx].append(series)

  # Single concat per column
  columns = {}
  for col_idx, series_list in all_series_by_col.items():
      col_name = series_list[0].name if hasattr(series_list[0], 'name') else 
f"col{col_idx}"
      columns[col_name] = pd.concat(series_list, ignore_index=True)
  value_df = pd.DataFrame(columns)
  {code}

  Benchmark results (5M rows, 3 columns):
  - Before: 0.226s
  - After: 0.075s
  - Improvement: 3x faster, 25% less memory

  The issue became more visible after pandas 2.2.0 upgrade (SPARK-50711) where 
concat(axis=0) performance characteristics changed for large datasets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to