gaoyajun02 created SPARK-57491:
----------------------------------

             Summary: Push-based shuffle may serve stale/incorrect data from 
duplicate task attempts in indeterminate stages
                 Key: SPARK-57491
                 URL: https://issues.apache.org/jira/browse/SPARK-57491
             Project: Spark
          Issue Type: Improvement
          Components: Shuffle
    Affects Versions: 4.0.0, 4.1.0, 3.5.0, 3.4.0, 3.3.0, 3.2.0, 4.2.0, 4.3.0
            Reporter: gaoyajun02


h3. Problem

When push-based shuffle (PSS/Celeborn) is enabled, *indeterminate stages* 
(stages that produce different output across task retries) can expose 
*incorrect or duplicated data* to downstream reduce tasks. This happens when 
multiple task attempts for the same map partition {*}both successfully push 
their shuffle data to the merger service{*}, but only one attempt is committed 
as the canonical result on the driver.The root cause is a gap between when a 
map task writes its shuffle data locally and when that data is pushed to the 
external shuffle service:
 # A ShuffleMapTask completes writing shuffle data to local disk
 # Before the driver can commit/kill this attempt, another attempt for the same 
partition (e.g., from speculation, executor preemption recovery, or retry after 
failure) also completes and writes its data
 # *Both attempts push their data to the push-based shuffle merger* — the 
merger accepts data from both since it has no knowledge of which attempt the 
driver will ultimately accept
 # The driver commits one attempt's {{MapStatus}} and kills the other, but *the 
merged block on the server now contains a mixture of data from both attempts*
 # Downstream reduce tasks read the merged block and silently consume 
*stale/corrupted data*

This is especially problematic for *nondeterministic/stage-retry-indeterminate* 
stages where:
 * The output depends on non-deterministic functions (e.g., {{{}rand(){}}}, 
{{{}UUID(){}}}, monotonically_increasing_id())
 * Input row ordering differs between attempts (e.g., HashJoin vs SortMergeJoin 
upstream)
 * The partitioning logic uses such non-deterministic values as keys

In these cases, two attempts for the same map partition may produce 
{*}different shuffle partition assignments{*}, meaning the same logical row 
gets hashed to different reduce partitions in each attempt. When both sets of 
data end up in the same merged block, reducers receive {*}duplicated or 
misrouted rows{*}, leading to incorrect query results (e.g., more bytes read 
than written in shuffle metrics).
h3. Example Scenario

Consider a join with {{rand()}} in the join condition:
{code:java}
//sql

SELECT * FROM t1 JOIN t2 ON t1.key = t2.key AND rand() < 0.1 {code}
If this stage runs with:
 * {*}Speculation enabled{*}: Two attempts of the same map task run concurrently
 * {*}HashJoin as upstream{*}: Row ordering into {{rand()}} is non-deterministic
 * {*}Push-based shuffle enabled{*}: Both attempts push data to Celeborn/PSS

Then:
 * Attempt 1 produces partition lengths: {{(27847, 24393, 26942, ...)}}
 * Attempt 2 produces *different* partition lengths: {{(28453, 27119, 30778, 
...)}} because {{rand()}} received rows in a different order
 * Both push to the merger; the merged block contains interleaved data from both
 * The reducer reads *more data than was written by the committed attempt alone*
 * Query results contain *duplicate rows* or *data corruption*

h3. Solution

This fix introduces a three-layer defense mechanism:
 * Layer 1 -- Deferred push via TaskCompletionListener in 
ShuffleWriteProcessor: Shuffle block push is deferred until after the task 
succeeds (registered via TaskCompletionListener). If the task is killed or 
fails before completion, the push is skipped entirely. This prevents the 
majority of redundant pushes at the source by ensuring only committed attempts 
ever push data.
 * Layer 2 -- Stale mapId tracking on the driver side in TaskSetManager and 
MapOutputTracker: For cases where deferred push alone is insufficient (e.g., 
two attempts both succeed within a race window), the driver tracks which mapIds 
represent stale or duplicate pushes. In TaskSetManager.handleSuccessfulTask, 
when a late-arriving or killed attempt's result arrives for an 
already-completed partition, its mapId is registered as stale via 
shuffleStatus.markDuplicatePushedMap(). The stale mapId set is serialized 
alongside MapStatus/MergeStatus in MapOutputTracker and propagated to all 
executors. New helper methods serializeStaleMapIds() and 
deserializeStaleMapIds() handle efficient binary serialization of this set.
 * Layer 3 -- Chunk-level stale detection on the reducer side in 
PushBasedFetchHelper: Before consuming any push-merged block, each reducer 
checks whether the server-side chunk bitmap contains any tracked stale mapId. 
The check merges all chunk bitmaps into a single RoaringBitmap and tests 
intersection with the stale mapId set using exists + contains. If any stale 
mapId is found, the reducer logs a warning and falls back to fetching 
individual unmerged blocks from the original map outputs -- the same fallback 
path used for merge failures or corrupted blocks. This check is applied to both 
remote merged blocks (in MergedBlocksMetaListener.onSuccess) and local merged 
blocks (in fetchPushMergedLocalBlock).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to