Varun Srinivas created SPARK-56145:
--------------------------------------

             Summary:  AQE: CoalesceShufflePartitions can eliminate join 
parallelism after OptimizeSkewedJoin finds no skew — no  post-coalesce re-check
                 Key: SPARK-56145
                 URL: https://issues.apache.org/jira/browse/SPARK-56145
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.0, 3.2.0
            Reporter: Varun Srinivas


CoalesceShufflePartitions can coalesce shuffle partitions on join stages down 
to 1, concentrating the entire shuffle dataset into a single reducer task. This 
happens _after_ OptimizeSkewedJoin has already run and determined no skew 
exists — a determination that becomes invalid once coalescing destroys the 
partition layout.

SPARK-35447 (fixed in 3.2.0) addressed a related interaction by ensuring 
OptimizeSkewedJoin runs before CoalesceShufflePartitions, preventing coalescing 
from inflating the median used for skew detection. However, the reverse 
interaction was not addressed: coalescing can _create_ a data concentration 
problem that didn't exist when skew detection checked.
h2. Root Cause

In AdaptiveSparkPlanExec, the two rules run in separate rule sets:
 # OptimizeSkewedJoin runs in queryStagePreparationRules during the planning 
phase. It reads raw MapOutputStatistics.bytesByPartitionId and computes median. 
If no individual partition exceeds max(skewedPartitionThresholdInBytes, 
skewedPartitionFactor * median), it correctly determines no skew exists.
 # CoalesceShufflePartitions runs in queryStageOptimizerRules during the 
per-stage optimization phase. It can merge many small partitions into very few 
— or even 1 — partition.

There is no post-coalesce re-evaluation. The skew assessment from step 1 is 
treated as final, even though coalescing in step 2 fundamentally changes the 
data distribution across tasks.
h2. Example

Consider a shuffle with 200 partitions, each ~10 MB (2 GB total). The advisory 
partition size is 64 MB (default).
 # OptimizeSkewedJoin runs: median = 10 MB, skew threshold = max(256 MB, 5 x 10 
MB) = 256 MB. No partition exceeds 256 MB. *No skew detected* — correct.
 # CoalesceShufflePartitions runs: 2 GB total / 64 MB advisory = ~31 
partitions. But with COALESCE_PARTITIONS_PARALLELISM_FIRST = true (default) and 
low defaultParallelism, it may coalesce further — in extreme cases, to {*}1 
partition{*}.
 # A single reducer task now processes 2 GB of join input. If the data contains 
a hot join key, this task hits cardinality explosion and becomes a 1,000x+ 
straggler. Even without hot keys, concentrating all data into one task 
eliminates parallelism.

h2. Production Evidence

Analysis of production Spark 3.4 jobs showed:
 * Top 25 straggler jobs (1,000-3,000x task duration vs stage average) *all* 
had num_output_partitions = 1 after coalescing.
 * Shuffle read analysis confirmed data concentration, not compute skew:

 
|*Job*|*Straggler Shuffle Read*|*Normal Task Shuffle Read*|*Data Concentration 
Ratio*|
|Job A|0.44 GB (19M records)|~0 bytes (2.5K records)|*17,682x*|
|Job B|1.65 GB (5.4M records)|~0 bytes (9 records)|*552,948x*|
|Job C|1.43 GB (51.8M records)|0.4 MB (22.6K records)|*3,716x*|

The straggler task was the only task doing any real work — all other tasks in 
the stage read effectively zero data.
h2. Suggested Fix

*Option A (minimal):* Enforce a minimum partition count in 
CoalesceShufflePartitions when the stage feeds into a join. Coalescing to 1 
partition on a join stage is never beneficial. 
COALESCE_PARTITIONS_MIN_PARTITION_NUM exists but is optional and not join-aware.

*Option B (more robust):* Add a lightweight post-coalesce skew check in 
queryStageOptimizerRules, after CoalesceShufflePartitions. This would evaluate 
the coalesced partition layout and split any partitions that now exceed the 
skew threshold.

*Option C (targeted):* When CoalesceShufflePartitions would reduce a join stage 
below N partitions (e.g., spark.default.parallelism or a new config), cap the 
coalescing at that floor.


h2. Related Issues
 * SPARK-35447 — Fixed the reverse interaction (coalescing inflating skew 
detection median). Resolved in 3.2.0.
 * SPARK-51872 — Open. CoalesceShufflePartitions creates excessively small 
partition counts during SMJ-to-BHJ conversion.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to