gaoyajun02 created SPARK-57491:
----------------------------------
Summary: Push-based shuffle may serve stale/incorrect data from
duplicate task attempts in indeterminate stages
Key: SPARK-57491
URL: https://issues.apache.org/jira/browse/SPARK-57491
Project: Spark
Issue Type: Improvement
Components: Shuffle
Affects Versions: 4.0.0, 4.1.0, 3.5.0, 3.4.0, 3.3.0, 3.2.0, 4.2.0, 4.3.0
Reporter: gaoyajun02
h3. Problem
When push-based shuffle (PSS/Celeborn) is enabled, *indeterminate stages*
(stages that produce different output across task retries) can expose
*incorrect or duplicated data* to downstream reduce tasks. This happens when
multiple task attempts for the same map partition {*}both successfully push
their shuffle data to the merger service{*}, but only one attempt is committed
as the canonical result on the driver.The root cause is a gap between when a
map task writes its shuffle data locally and when that data is pushed to the
external shuffle service:
# A ShuffleMapTask completes writing shuffle data to local disk
# Before the driver can commit/kill this attempt, another attempt for the same
partition (e.g., from speculation, executor preemption recovery, or retry after
failure) also completes and writes its data
# *Both attempts push their data to the push-based shuffle merger* — the
merger accepts data from both since it has no knowledge of which attempt the
driver will ultimately accept
# The driver commits one attempt's {{MapStatus}} and kills the other, but *the
merged block on the server now contains a mixture of data from both attempts*
# Downstream reduce tasks read the merged block and silently consume
*stale/corrupted data*
This is especially problematic for *nondeterministic/stage-retry-indeterminate*
stages where:
* The output depends on non-deterministic functions (e.g., {{{}rand(){}}},
{{{}UUID(){}}}, monotonically_increasing_id())
* Input row ordering differs between attempts (e.g., HashJoin vs SortMergeJoin
upstream)
* The partitioning logic uses such non-deterministic values as keys
In these cases, two attempts for the same map partition may produce
{*}different shuffle partition assignments{*}, meaning the same logical row
gets hashed to different reduce partitions in each attempt. When both sets of
data end up in the same merged block, reducers receive {*}duplicated or
misrouted rows{*}, leading to incorrect query results (e.g., more bytes read
than written in shuffle metrics).
h3. Example Scenario
Consider a join with {{rand()}} in the join condition:
{code:java}
//sql
SELECT * FROM t1 JOIN t2 ON t1.key = t2.key AND rand() < 0.1 {code}
If this stage runs with:
* {*}Speculation enabled{*}: Two attempts of the same map task run concurrently
* {*}HashJoin as upstream{*}: Row ordering into {{rand()}} is non-deterministic
* {*}Push-based shuffle enabled{*}: Both attempts push data to Celeborn/PSS
Then:
* Attempt 1 produces partition lengths: {{(27847, 24393, 26942, ...)}}
* Attempt 2 produces *different* partition lengths: {{(28453, 27119, 30778,
...)}} because {{rand()}} received rows in a different order
* Both push to the merger; the merged block contains interleaved data from both
* The reducer reads *more data than was written by the committed attempt alone*
* Query results contain *duplicate rows* or *data corruption*
h3. Solution
This fix introduces a three-layer defense mechanism:
* Layer 1 -- Deferred push via TaskCompletionListener in
ShuffleWriteProcessor: Shuffle block push is deferred until after the task
succeeds (registered via TaskCompletionListener). If the task is killed or
fails before completion, the push is skipped entirely. This prevents the
majority of redundant pushes at the source by ensuring only committed attempts
ever push data.
* Layer 2 -- Stale mapId tracking on the driver side in TaskSetManager and
MapOutputTracker: For cases where deferred push alone is insufficient (e.g.,
two attempts both succeed within a race window), the driver tracks which mapIds
represent stale or duplicate pushes. In TaskSetManager.handleSuccessfulTask,
when a late-arriving or killed attempt's result arrives for an
already-completed partition, its mapId is registered as stale via
shuffleStatus.markDuplicatePushedMap(). The stale mapId set is serialized
alongside MapStatus/MergeStatus in MapOutputTracker and propagated to all
executors. New helper methods serializeStaleMapIds() and
deserializeStaleMapIds() handle efficient binary serialization of this set.
* Layer 3 -- Chunk-level stale detection on the reducer side in
PushBasedFetchHelper: Before consuming any push-merged block, each reducer
checks whether the server-side chunk bitmap contains any tracked stale mapId.
The check merges all chunk bitmaps into a single RoaringBitmap and tests
intersection with the stale mapId set using exists + contains. If any stale
mapId is found, the reducer logs a warning and falls back to fetching
individual unmerged blocks from the original map outputs -- the same fallback
path used for merge failures or corrupted blocks. This check is applied to both
remote merged blocks (in MergedBlocksMetaListener.onSuccess) and local merged
blocks (in fetchPushMergedLocalBlock).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]