[ 
https://issues.apache.org/jira/browse/SPARK-57491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-57491:
-----------------------------------
    Labels: pull-request-available  (was: )

> Push-based shuffle may serve stale/incorrect data from duplicate task 
> attempts in indeterminate stages
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-57491
>                 URL: https://issues.apache.org/jira/browse/SPARK-57491
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Shuffle
>    Affects Versions: 3.2.0, 3.3.0, 3.4.0, 3.5.0, 4.1.0, 4.0.0, 4.2.0, 4.3.0
>            Reporter: gaoyajun02
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Problem
> When push-based shuffle (PSS/Celeborn) is enabled, *indeterminate stages* 
> (stages that produce different output across task retries) can expose 
> *incorrect or duplicated data* to downstream reduce tasks. This happens when 
> multiple task attempts for the same map partition {*}both successfully push 
> their shuffle data to the merger service{*}, but only one attempt is 
> committed as the canonical result on the driver.The root cause is a gap 
> between when a map task writes its shuffle data locally and when that data is 
> pushed to the external shuffle service:
>  # A ShuffleMapTask completes writing shuffle data to local disk
>  # Before the driver can commit/kill this attempt, another attempt for the 
> same partition (e.g., from speculation, executor preemption recovery, or 
> retry after failure) also completes and writes its data
>  # *Both attempts push their data to the push-based shuffle merger* — the 
> merger accepts data from both since it has no knowledge of which attempt the 
> driver will ultimately accept
>  # The driver commits one attempt's {{MapStatus}} and kills the other, but 
> *the merged block on the server now contains a mixture of data from both 
> attempts*
>  # Downstream reduce tasks read the merged block and silently consume 
> *stale/corrupted data*
> This is especially problematic for 
> *nondeterministic/stage-retry-indeterminate* stages where:
>  * The output depends on non-deterministic functions (e.g., {{{}rand(){}}}, 
> {{{}UUID(){}}}, monotonically_increasing_id())
>  * Input row ordering differs between attempts (e.g., HashJoin vs 
> SortMergeJoin upstream)
>  * The partitioning logic uses such non-deterministic values as keys
> In these cases, two attempts for the same map partition may produce 
> {*}different shuffle partition assignments{*}, meaning the same logical row 
> gets hashed to different reduce partitions in each attempt. When both sets of 
> data end up in the same merged block, reducers receive {*}duplicated or 
> misrouted rows{*}, leading to incorrect query results (e.g., more bytes read 
> than written in shuffle metrics).
> h3. Example Scenario
> Consider a join with {{rand()}} in the join condition:
> {code:java}
> //sql
> SELECT * FROM t1 JOIN t2 ON t1.key = t2.key AND rand() < 0.1 {code}
> If this stage runs with:
>  * {*}Speculation enabled{*}: Two attempts of the same map task run 
> concurrently
>  * {*}HashJoin as upstream{*}: Row ordering into {{rand()}} is 
> non-deterministic
>  * {*}Push-based shuffle enabled{*}: Both attempts push data to Celeborn/PSS
> Then:
>  * Attempt 1 produces partition lengths: {{(27847, 24393, 26942, ...)}}
>  * Attempt 2 produces *different* partition lengths: {{(28453, 27119, 30778, 
> ...)}} because {{rand()}} received rows in a different order
>  * Both push to the merger; the merged block contains interleaved data from 
> both
>  * The reducer reads *more data than was written by the committed attempt 
> alone*
>  * Query results contain *duplicate rows* or *data corruption*
> h3. Solution
> This fix introduces a three-layer defense mechanism:
>  * Layer 1 -- Deferred push via TaskCompletionListener in 
> ShuffleWriteProcessor: Shuffle block push is deferred until after the task 
> succeeds (registered via TaskCompletionListener). If the task is killed or 
> fails before completion, the push is skipped entirely. This prevents the 
> majority of redundant pushes at the source by ensuring only committed 
> attempts ever push data.
>  * Layer 2 -- Stale mapId tracking on the driver side in TaskSetManager and 
> MapOutputTracker: For cases where deferred push alone is insufficient (e.g., 
> two attempts both succeed within a race window), the driver tracks which 
> mapIds represent stale or duplicate pushes. In 
> TaskSetManager.handleSuccessfulTask, when a late-arriving or killed attempt's 
> result arrives for an already-completed partition, its mapId is registered as 
> stale via shuffleStatus.markDuplicatePushedMap(). The stale mapId set is 
> serialized alongside MapStatus/MergeStatus in MapOutputTracker and propagated 
> to all executors. New helper methods serializeStaleMapIds() and 
> deserializeStaleMapIds() handle efficient binary serialization of this set.
>  * Layer 3 -- Chunk-level stale detection on the reducer side in 
> PushBasedFetchHelper: Before consuming any push-merged block, each reducer 
> checks whether the server-side chunk bitmap contains any tracked stale mapId. 
> The check merges all chunk bitmaps into a single RoaringBitmap and tests 
> intersection with the stale mapId set using exists + contains. If any stale 
> mapId is found, the reducer logs a warning and falls back to fetching 
> individual unmerged blocks from the original map outputs -- the same fallback 
> path used for merge failures or corrupted blocks. This check is applied to 
> both remote merged blocks (in MergedBlocksMetaListener.onSuccess) and local 
> merged blocks (in fetchPushMergedLocalBlock).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to