[
https://issues.apache.org/jira/browse/SPARK-56145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088664#comment-18088664
]
Anupam Yadav commented on SPARK-56145:
--------------------------------------
After investigation and review (PR https://github.com/apache/spark/pull/56243),
this is not actionable as proposed, and the proposed approach (a data-aware
minimum-partition-count floor for join input stages) does not address the
stated problem.
The coalesce rule is already data-aware:
ShufflePartitionsUtil.coalescePartitions computes targetSize =
min(ceil(totalSize/minNum), advisory). A shuffle group only coalesces to a
single partition when totalSize <= advisory. A join-input floor applied in the
totalSize > advisory branch is therefore vacuous for the 1-partition case
(which is explicitly skipped), and where it does engage the baseline already
yields >= ceil(totalSize/advisory) >= 2 partitions. The only residual effect is
shrinking targetSize below the advisory size, which merely produces more
sub-advisory partitions (added scheduling overhead, no benefit) and overrides
the user's advisory preference.
Additionally, input size (left + right) is the wrong cost model for a join:
per-partition work for SMJ/SHJ is already bounded by the advisory target, and
output blow-up for genuinely explosive join types is already handled by
shrinking the advisory target to MIN_PARTITION_SIZE (SMJ/SHJ are deliberately
excluded because equi-join output is not generally explosive). No
master-failing test could be produced that reflects the stated goal.
Resolving as Not A Problem to avoid duplicated effort. If there is a real
coalescing-related parallelism gap, the more promising area is the proportional
split across coalesced groups in ShufflePartitionsUtil, not a join-input count
floor.
> AQE: CoalesceShufflePartitions can eliminate join parallelism after
> OptimizeSkewedJoin finds no skew; no post-coalesce re-check
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-56145
> URL: https://issues.apache.org/jira/browse/SPARK-56145
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.0, 3.4.0
> Reporter: Varun Srinivas
> Priority: Major
> Labels: pull-request-available
>
> CoalesceShufflePartitions can coalesce shuffle partitions on join stages down
> to 1, concentrating the entire shuffle dataset into a single reducer task.
> This happens _after_ OptimizeSkewedJoin has already run and determined no
> skew exists — a determination that becomes invalid once coalescing destroys
> the partition layout.
> SPARK-35447 (fixed in 3.2.0) addressed a related interaction by ensuring
> OptimizeSkewedJoin runs before CoalesceShufflePartitions, preventing
> coalescing from inflating the median used for skew detection. However, the
> reverse interaction was not addressed: coalescing can _create_ a data
> concentration problem that didn't exist when skew detection checked.
> h2. Root Cause
> In AdaptiveSparkPlanExec, the two rules run in separate rule sets:
> # OptimizeSkewedJoin runs in queryStagePreparationRules during the planning
> phase. It reads raw MapOutputStatistics.bytesByPartitionId and computes
> median. If no individual partition exceeds
> max(skewedPartitionThresholdInBytes, skewedPartitionFactor * median), it
> correctly determines no skew exists.
> # CoalesceShufflePartitions runs in queryStageOptimizerRules during the
> per-stage optimization phase. It can merge many small partitions into very
> few — or even 1 — partition.
> There is no post-coalesce re-evaluation. The skew assessment from step 1 is
> treated as final, even though coalescing in step 2 fundamentally changes the
> data distribution across tasks.
> h2. Example
> Consider a shuffle with 200 partitions, each ~10 MB (2 GB total). The
> advisory partition size is 64 MB (default).
> # OptimizeSkewedJoin runs: median = 10 MB, skew threshold = max(256 MB, 5 x
> 10 MB) = 256 MB. No partition exceeds 256 MB. *No skew detected* — correct.
> # CoalesceShufflePartitions runs: 2 GB total / 64 MB advisory = ~31
> partitions. But with COALESCE_PARTITIONS_PARALLELISM_FIRST = true (default)
> and low defaultParallelism, it may coalesce further — in extreme cases, to
> {*}1 partition{*}.
> # A single reducer task now processes 2 GB of join input. If the data
> contains a hot join key, this task hits cardinality explosion and becomes a
> 1,000x+ straggler. Even without hot keys, concentrating all data into one
> task eliminates parallelism.
> h2. Production Evidence
> Analysis of production Spark 3.4 jobs showed:
> * Top 25 straggler jobs (1,000-3,000x task duration vs stage average) *all*
> had num_output_partitions = 1 after coalescing.
> * Shuffle read analysis confirmed data concentration, not compute skew:
>
> |*Job*|*Straggler Shuffle Read*|*Normal Task Shuffle Read*|*Data
> Concentration Ratio*|
> |Job A|0.44 GB (19M records)|~0 bytes (2.5K records)|*17,682x*|
> |Job B|1.65 GB (5.4M records)|~0 bytes (9 records)|*552,948x*|
> |Job C|1.43 GB (51.8M records)|0.4 MB (22.6K records)|*3,716x*|
> The straggler task was the only task doing any real work — all other tasks in
> the stage read effectively zero data.
> h2. Suggested Fix
> *Option A (minimal):* Enforce a minimum partition count in
> CoalesceShufflePartitions when the stage feeds into a join. Coalescing to 1
> partition on a join stage is never beneficial.
> COALESCE_PARTITIONS_MIN_PARTITION_NUM exists but is optional and not
> join-aware.
> *Option B (more robust):* Add a lightweight post-coalesce skew check in
> queryStageOptimizerRules, after CoalesceShufflePartitions. This would
> evaluate the coalesced partition layout and split any partitions that now
> exceed the skew threshold.
> *Option C (targeted):* When CoalesceShufflePartitions would reduce a join
> stage below N partitions (e.g., spark.default.parallelism or a new config),
> cap the coalescing at that floor.
> h2. Related Issues
> * SPARK-35447 — Fixed the reverse interaction (coalescing inflating skew
> detection median). Resolved in 3.2.0.
> * SPARK-51872 — Open. CoalesceShufflePartitions creates excessively small
> partition counts during SMJ-to-BHJ conversion.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]