[
https://issues.apache.org/jira/browse/SPARK-56145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Varun Srinivas updated SPARK-56145:
-----------------------------------
Summary: AQE: CoalesceShufflePartitions can eliminate join parallelism
after OptimizeSkewedJoin finds no skew; no post-coalesce re-check (was: AQE:
CoalesceShufflePartitions can eliminate join parallelism after
OptimizeSkewedJoin finds no skew — no post-coalesce re-check)
> AQE: CoalesceShufflePartitions can eliminate join parallelism after
> OptimizeSkewedJoin finds no skew; no post-coalesce re-check
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-56145
> URL: https://issues.apache.org/jira/browse/SPARK-56145
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.0, 3.4.0
> Reporter: Varun Srinivas
> Priority: Major
>
> CoalesceShufflePartitions can coalesce shuffle partitions on join stages down
> to 1, concentrating the entire shuffle dataset into a single reducer task.
> This happens _after_ OptimizeSkewedJoin has already run and determined no
> skew exists — a determination that becomes invalid once coalescing destroys
> the partition layout.
> SPARK-35447 (fixed in 3.2.0) addressed a related interaction by ensuring
> OptimizeSkewedJoin runs before CoalesceShufflePartitions, preventing
> coalescing from inflating the median used for skew detection. However, the
> reverse interaction was not addressed: coalescing can _create_ a data
> concentration problem that didn't exist when skew detection checked.
> h2. Root Cause
> In AdaptiveSparkPlanExec, the two rules run in separate rule sets:
> # OptimizeSkewedJoin runs in queryStagePreparationRules during the planning
> phase. It reads raw MapOutputStatistics.bytesByPartitionId and computes
> median. If no individual partition exceeds
> max(skewedPartitionThresholdInBytes, skewedPartitionFactor * median), it
> correctly determines no skew exists.
> # CoalesceShufflePartitions runs in queryStageOptimizerRules during the
> per-stage optimization phase. It can merge many small partitions into very
> few — or even 1 — partition.
> There is no post-coalesce re-evaluation. The skew assessment from step 1 is
> treated as final, even though coalescing in step 2 fundamentally changes the
> data distribution across tasks.
> h2. Example
> Consider a shuffle with 200 partitions, each ~10 MB (2 GB total). The
> advisory partition size is 64 MB (default).
> # OptimizeSkewedJoin runs: median = 10 MB, skew threshold = max(256 MB, 5 x
> 10 MB) = 256 MB. No partition exceeds 256 MB. *No skew detected* — correct.
> # CoalesceShufflePartitions runs: 2 GB total / 64 MB advisory = ~31
> partitions. But with COALESCE_PARTITIONS_PARALLELISM_FIRST = true (default)
> and low defaultParallelism, it may coalesce further — in extreme cases, to
> {*}1 partition{*}.
> # A single reducer task now processes 2 GB of join input. If the data
> contains a hot join key, this task hits cardinality explosion and becomes a
> 1,000x+ straggler. Even without hot keys, concentrating all data into one
> task eliminates parallelism.
> h2. Production Evidence
> Analysis of production Spark 3.4 jobs showed:
> * Top 25 straggler jobs (1,000-3,000x task duration vs stage average) *all*
> had num_output_partitions = 1 after coalescing.
> * Shuffle read analysis confirmed data concentration, not compute skew:
>
> |*Job*|*Straggler Shuffle Read*|*Normal Task Shuffle Read*|*Data
> Concentration Ratio*|
> |Job A|0.44 GB (19M records)|~0 bytes (2.5K records)|*17,682x*|
> |Job B|1.65 GB (5.4M records)|~0 bytes (9 records)|*552,948x*|
> |Job C|1.43 GB (51.8M records)|0.4 MB (22.6K records)|*3,716x*|
> The straggler task was the only task doing any real work — all other tasks in
> the stage read effectively zero data.
> h2. Suggested Fix
> *Option A (minimal):* Enforce a minimum partition count in
> CoalesceShufflePartitions when the stage feeds into a join. Coalescing to 1
> partition on a join stage is never beneficial.
> COALESCE_PARTITIONS_MIN_PARTITION_NUM exists but is optional and not
> join-aware.
> *Option B (more robust):* Add a lightweight post-coalesce skew check in
> queryStageOptimizerRules, after CoalesceShufflePartitions. This would
> evaluate the coalesced partition layout and split any partitions that now
> exceed the skew threshold.
> *Option C (targeted):* When CoalesceShufflePartitions would reduce a join
> stage below N partitions (e.g., spark.default.parallelism or a new config),
> cap the coalescing at that floor.
> h2. Related Issues
> * SPARK-35447 — Fixed the reverse interaction (coalescing inflating skew
> detection median). Resolved in 3.2.0.
> * SPARK-51872 — Open. CoalesceShufflePartitions creates excessively small
> partition counts during SMJ-to-BHJ conversion.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]