James Xu created SPARK-57688:
--------------------------------

             Summary: Add config to allow bypassing pre-shuffle partial 
aggregation
                 Key: SPARK-57688
                 URL: https://issues.apache.org/jira/browse/SPARK-57688
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer
    Affects Versions: 4.3.0
            Reporter: James Xu


Problem:

The standard two-phase aggregation plan (Partial → shuffle → Final) assumes 
that pre-shuffle partial aggregation meaningfully reduces data volume. This 
assumption breaks down in two scenarios.

Scenario 1: High group cardinality.
When group cardinality is high relative to partition size, every input row maps 
to a distinct key, so the partial aggregation produces one output row per input 
row and adds CPU and memory overhead with zero shuffle benefit.

SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
FROM orders
GROUP BY user_id   -- high-cardinality key: millions of distinct users

On a table with 500M rows and 200M distinct user_id values, the pre-shuffle 
HashAggregateExec in Partial mode churns through the full dataset, spills when 
the hash map overflows, and still emits ~200M rows into the shuffle. The 
partial phase wastes wall-clock time and memory without reducing shuffle write 
volume.

Scenario 2: Skewed input data.
Even when partial aggregation can reduce data volume on average, skewed input 
partitions can make it harmful. If one partition contains a disproportionate 
share of rows for a small number of keys, the partial HashAggregateExec on that 
partition must hold a large hash map in memory, triggering spills. The skewed 
partition becomes the bottleneck and dominates wall-clock time — worse than if 
the data had been shuffled first and aggregated on already-partitioned, evenly 
distributed data.

SELECT country_code, SUM(revenue)
FROM orders
GROUP BY country_code   -- a few dominant countries hold 80% of rows

---
Solution:

Add a new boolean SQL config spark.sql.execution.bypassPartialAggregation 
(default false). When set to true, the pre-shuffle partial aggregation phase is 
skipped and a single Complete-mode aggregation runs after the shuffle instead. 
Users who have identified either of the above patterns in their workload can 
opt in to bypass the partial phase.

The config has no effect on queries containing DISTINCT aggregate functions or 
session_window grouping keys, where the partial aggregation phases are required 
for correctness and are always applied.

---
Expected Impact:

For aggregations over high-cardinality keys where the pre-shuffle reduction 
ratio is near 1.0:
- Eliminates one full scan of the partition data by the partial 
HashAggregateExec
- Eliminates hash map construction and potential spill in the partial phase
- Wall-clock improvement is most significant in spill-heavy cases

For aggregations over skewed input:
- Moves aggregation to post-shuffle, where data is evenly distributed by 
grouping key
- Eliminates the partial-phase bottleneck on the skewed partition

The config is opt-in (false by default) so existing query behavior is unchanged.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to