[
https://issues.apache.org/jira/browse/SPARK-57688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-57688:
-----------------------------------
Labels: pull-request-available (was: )
> Add config to allow bypassing pre-shuffle partial aggregation
> -------------------------------------------------------------
>
> Key: SPARK-57688
> URL: https://issues.apache.org/jira/browse/SPARK-57688
> Project: Spark
> Issue Type: Improvement
> Components: Optimizer
> Affects Versions: 4.3.0
> Reporter: James Xu
> Priority: Major
> Labels: pull-request-available
>
> h2. Problem:
> The standard two-phase aggregation plan (Partial → shuffle → Final) assumes
> that pre-shuffle partial aggregation meaningfully reduces data volume. This
> assumption breaks down in two scenarios.
> h3. Scenario 1: High group cardinality.
> When group cardinality is high relative to partition size, every input row
> maps to a distinct key, so the partial aggregation produces one output row
> per input row and adds CPU and memory overhead with zero shuffle benefit.
> {code:java}
> SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
> FROM orders
> GROUP BY user_id – high-cardinality key: millions of distinct users{code}
> On a table with 500M rows and 200M distinct user_id values, the pre-shuffle
> HashAggregateExec in Partial mode churns through the full dataset, spills
> when the hash map overflows, and still emits ~200M rows into the shuffle. The
> partial phase wastes wall-clock time and memory without reducing shuffle
> write volume.
> h3. Scenario 2: Skewed input data.
> Even when partial aggregation can reduce data volume on average, skewed input
> partitions can make it harmful. If one partition contains a disproportionate
> share of rows for a small number of keys, the partial HashAggregateExec on
> that partition must hold a large hash map in memory, triggering spills. The
> skewed partition becomes the bottleneck and dominates wall-clock time — worse
> than if the data had been shuffled first and aggregated on
> already-partitioned, evenly distributed data.
> {code:java}
> SELECT country_code, SUM(revenue)
> FROM orders
> GROUP BY country_code – a few dominant countries hold 80% of rows{code}
> h2. —
> h2. Solution:
> Add a new boolean SQL config spark.sql.execution.bypassPartialAggregation
> (default false). When set to true, the pre-shuffle partial aggregation phase
> is skipped and a single Complete-mode aggregation runs after the shuffle
> instead. Users who have identified either of the above patterns in their
> workload can opt in to bypass the partial phase.
> The config has no effect on queries containing DISTINCT aggregate functions
> or session_window grouping keys, where the partial aggregation phases are
> required for correctness and are always applied.
> h2. —
> h2. Expected Impact:
> For aggregations over high-cardinality keys where the pre-shuffle reduction
> ratio is near 1.0:
> - Eliminates one full scan of the partition data by the partial
> HashAggregateExec
> - Eliminates hash map construction and potential spill in the partial phase
> - Wall-clock improvement is most significant in spill-heavy cases
> For aggregations over skewed input:
> - Moves aggregation to post-shuffle, where data is evenly distributed by
> grouping key
> - Eliminates the partial-phase bottleneck on the skewed partition
> The config is opt-in (false by default) so existing query behavior is
> unchanged.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]