[ 
https://issues.apache.org/jira/browse/SPARK-57688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-57688:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add config to allow bypassing pre-shuffle partial aggregation
> -------------------------------------------------------------
>
>                 Key: SPARK-57688
>                 URL: https://issues.apache.org/jira/browse/SPARK-57688
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer
>    Affects Versions: 4.3.0
>            Reporter: James Xu
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Problem:
> The standard two-phase aggregation plan (Partial → shuffle → Final) assumes 
> that pre-shuffle partial aggregation meaningfully reduces data volume. This 
> assumption breaks down in two scenarios.
> h3. Scenario 1: High group cardinality.
> When group cardinality is high relative to partition size, every input row 
> maps to a distinct key, so the partial aggregation produces one output row 
> per input row and adds CPU and memory overhead with zero shuffle benefit.
> {code:java}
> SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
> FROM orders
> GROUP BY user_id   – high-cardinality key: millions of distinct users{code}
> On a table with 500M rows and 200M distinct user_id values, the pre-shuffle 
> HashAggregateExec in Partial mode churns through the full dataset, spills 
> when the hash map overflows, and still emits ~200M rows into the shuffle. The 
> partial phase wastes wall-clock time and memory without reducing shuffle 
> write volume.
> h3. Scenario 2: Skewed input data.
> Even when partial aggregation can reduce data volume on average, skewed input 
> partitions can make it harmful. If one partition contains a disproportionate 
> share of rows for a small number of keys, the partial HashAggregateExec on 
> that partition must hold a large hash map in memory, triggering spills. The 
> skewed partition becomes the bottleneck and dominates wall-clock time — worse 
> than if the data had been shuffled first and aggregated on 
> already-partitioned, evenly distributed data.
> {code:java}
> SELECT country_code, SUM(revenue)
> FROM orders
> GROUP BY country_code   – a few dominant countries hold 80% of rows{code}
> h2. —
> h2. Solution:
> Add a new boolean SQL config spark.sql.execution.bypassPartialAggregation 
> (default false). When set to true, the pre-shuffle partial aggregation phase 
> is skipped and a single Complete-mode aggregation runs after the shuffle 
> instead. Users who have identified either of the above patterns in their 
> workload can opt in to bypass the partial phase.
> The config has no effect on queries containing DISTINCT aggregate functions 
> or session_window grouping keys, where the partial aggregation phases are 
> required for correctness and are always applied.
> h2. —
> h2. Expected Impact:
> For aggregations over high-cardinality keys where the pre-shuffle reduction 
> ratio is near 1.0:
>  - Eliminates one full scan of the partition data by the partial 
> HashAggregateExec
>  - Eliminates hash map construction and potential spill in the partial phase
>  - Wall-clock improvement is most significant in spill-heavy cases
> For aggregations over skewed input:
>  - Moves aggregation to post-shuffle, where data is evenly distributed by 
> grouping key
>  - Eliminates the partial-phase bottleneck on the skewed partition
> The config is opt-in (false by default) so existing query behavior is 
> unchanged.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to