[ 
https://issues.apache.org/jira/browse/SPARK-57688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Xu updated SPARK-57688:
-----------------------------
    Description: 
*Problem:*

The standard two-phase aggregation plan (Partial → shuffle → Final) assumes 
that pre-shuffle partial aggregation meaningfully reduces data volume. This 
assumption breaks down in two scenarios.

Scenario 1: High group cardinality.
When group cardinality is high relative to partition size, every input row maps 
to a distinct key, so the partial aggregation produces one output row per input 
row and adds CPU and memory overhead with zero shuffle benefit.

SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
FROM orders
GROUP BY user_id   – high-cardinality key: millions of distinct users

On a table with 500M rows and 200M distinct user_id values, the pre-shuffle 
HashAggregateExec in Partial mode churns through the full dataset, spills when 
the hash map overflows, and still emits ~200M rows into the shuffle. The 
partial phase wastes wall-clock time and memory without reducing shuffle write 
volume.

Scenario 2: Skewed input data.
Even when partial aggregation can reduce data volume on average, skewed input 
partitions can make it harmful. If one partition contains a disproportionate 
share of rows for a small number of keys, the partial HashAggregateExec on that 
partition must hold a large hash map in memory, triggering spills. The skewed 
partition becomes the bottleneck and dominates wall-clock time — worse than if 
the data had been shuffled first and aggregated on already-partitioned, evenly 
distributed data.

SELECT country_code, SUM(revenue)
FROM orders
GROUP BY country_code   – a few dominant countries hold 80% of rows

—
Solution:

Add a new boolean SQL config spark.sql.execution.bypassPartialAggregation 
(default false). When set to true, the pre-shuffle partial aggregation phase is 
skipped and a single Complete-mode aggregation runs after the shuffle instead. 
Users who have identified either of the above patterns in their workload can 
opt in to bypass the partial phase.

The config has no effect on queries containing DISTINCT aggregate functions or 
session_window grouping keys, where the partial aggregation phases are required 
for correctness and are always applied.

—
Expected Impact:

For aggregations over high-cardinality keys where the pre-shuffle reduction 
ratio is near 1.0:
 - Eliminates one full scan of the partition data by the partial 
HashAggregateExec
 - Eliminates hash map construction and potential spill in the partial phase
 - Wall-clock improvement is most significant in spill-heavy cases

For aggregations over skewed input:
 - Moves aggregation to post-shuffle, where data is evenly distributed by 
grouping key
 - Eliminates the partial-phase bottleneck on the skewed partition

The config is opt-in (false by default) so existing query behavior is unchanged.

  was:
Problem:

The standard two-phase aggregation plan (Partial → shuffle → Final) assumes 
that pre-shuffle partial aggregation meaningfully reduces data volume. This 
assumption breaks down in two scenarios.

Scenario 1: High group cardinality.
When group cardinality is high relative to partition size, every input row maps 
to a distinct key, so the partial aggregation produces one output row per input 
row and adds CPU and memory overhead with zero shuffle benefit.

SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
FROM orders
GROUP BY user_id   -- high-cardinality key: millions of distinct users

On a table with 500M rows and 200M distinct user_id values, the pre-shuffle 
HashAggregateExec in Partial mode churns through the full dataset, spills when 
the hash map overflows, and still emits ~200M rows into the shuffle. The 
partial phase wastes wall-clock time and memory without reducing shuffle write 
volume.

Scenario 2: Skewed input data.
Even when partial aggregation can reduce data volume on average, skewed input 
partitions can make it harmful. If one partition contains a disproportionate 
share of rows for a small number of keys, the partial HashAggregateExec on that 
partition must hold a large hash map in memory, triggering spills. The skewed 
partition becomes the bottleneck and dominates wall-clock time — worse than if 
the data had been shuffled first and aggregated on already-partitioned, evenly 
distributed data.

SELECT country_code, SUM(revenue)
FROM orders
GROUP BY country_code   -- a few dominant countries hold 80% of rows

---
Solution:

Add a new boolean SQL config spark.sql.execution.bypassPartialAggregation 
(default false). When set to true, the pre-shuffle partial aggregation phase is 
skipped and a single Complete-mode aggregation runs after the shuffle instead. 
Users who have identified either of the above patterns in their workload can 
opt in to bypass the partial phase.

The config has no effect on queries containing DISTINCT aggregate functions or 
session_window grouping keys, where the partial aggregation phases are required 
for correctness and are always applied.

---
Expected Impact:

For aggregations over high-cardinality keys where the pre-shuffle reduction 
ratio is near 1.0:
- Eliminates one full scan of the partition data by the partial 
HashAggregateExec
- Eliminates hash map construction and potential spill in the partial phase
- Wall-clock improvement is most significant in spill-heavy cases

For aggregations over skewed input:
- Moves aggregation to post-shuffle, where data is evenly distributed by 
grouping key
- Eliminates the partial-phase bottleneck on the skewed partition

The config is opt-in (false by default) so existing query behavior is unchanged.


> Add config to allow bypassing pre-shuffle partial aggregation
> -------------------------------------------------------------
>
>                 Key: SPARK-57688
>                 URL: https://issues.apache.org/jira/browse/SPARK-57688
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer
>    Affects Versions: 4.3.0
>            Reporter: James Xu
>            Priority: Major
>
> *Problem:*
> The standard two-phase aggregation plan (Partial → shuffle → Final) assumes 
> that pre-shuffle partial aggregation meaningfully reduces data volume. This 
> assumption breaks down in two scenarios.
> Scenario 1: High group cardinality.
> When group cardinality is high relative to partition size, every input row 
> maps to a distinct key, so the partial aggregation produces one output row 
> per input row and adds CPU and memory overhead with zero shuffle benefit.
> SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
> FROM orders
> GROUP BY user_id   – high-cardinality key: millions of distinct users
> On a table with 500M rows and 200M distinct user_id values, the pre-shuffle 
> HashAggregateExec in Partial mode churns through the full dataset, spills 
> when the hash map overflows, and still emits ~200M rows into the shuffle. The 
> partial phase wastes wall-clock time and memory without reducing shuffle 
> write volume.
> Scenario 2: Skewed input data.
> Even when partial aggregation can reduce data volume on average, skewed input 
> partitions can make it harmful. If one partition contains a disproportionate 
> share of rows for a small number of keys, the partial HashAggregateExec on 
> that partition must hold a large hash map in memory, triggering spills. The 
> skewed partition becomes the bottleneck and dominates wall-clock time — worse 
> than if the data had been shuffled first and aggregated on 
> already-partitioned, evenly distributed data.
> SELECT country_code, SUM(revenue)
> FROM orders
> GROUP BY country_code   – a few dominant countries hold 80% of rows
> —
> Solution:
> Add a new boolean SQL config spark.sql.execution.bypassPartialAggregation 
> (default false). When set to true, the pre-shuffle partial aggregation phase 
> is skipped and a single Complete-mode aggregation runs after the shuffle 
> instead. Users who have identified either of the above patterns in their 
> workload can opt in to bypass the partial phase.
> The config has no effect on queries containing DISTINCT aggregate functions 
> or session_window grouping keys, where the partial aggregation phases are 
> required for correctness and are always applied.
> —
> Expected Impact:
> For aggregations over high-cardinality keys where the pre-shuffle reduction 
> ratio is near 1.0:
>  - Eliminates one full scan of the partition data by the partial 
> HashAggregateExec
>  - Eliminates hash map construction and potential spill in the partial phase
>  - Wall-clock improvement is most significant in spill-heavy cases
> For aggregations over skewed input:
>  - Moves aggregation to post-shuffle, where data is evenly distributed by 
> grouping key
>  - Eliminates the partial-phase bottleneck on the skewed partition
> The config is opt-in (false by default) so existing query behavior is 
> unchanged.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to