[
https://issues.apache.org/jira/browse/SPARK-57491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-57491:
-----------------------------------
Labels: pull-request-available (was: )
> Push-based shuffle may serve stale/incorrect data from duplicate task
> attempts in indeterminate stages
> ------------------------------------------------------------------------------------------------------
>
> Key: SPARK-57491
> URL: https://issues.apache.org/jira/browse/SPARK-57491
> Project: Spark
> Issue Type: Sub-task
> Components: Shuffle
> Affects Versions: 3.2.0, 3.3.0, 3.4.0, 3.5.0, 4.1.0, 4.0.0, 4.2.0, 4.3.0
> Reporter: gaoyajun02
> Priority: Major
> Labels: pull-request-available
>
> h3. Problem
> When push-based shuffle (PSS/Celeborn) is enabled, *indeterminate stages*
> (stages that produce different output across task retries) can expose
> *incorrect or duplicated data* to downstream reduce tasks. This happens when
> multiple task attempts for the same map partition {*}both successfully push
> their shuffle data to the merger service{*}, but only one attempt is
> committed as the canonical result on the driver.The root cause is a gap
> between when a map task writes its shuffle data locally and when that data is
> pushed to the external shuffle service:
> # A ShuffleMapTask completes writing shuffle data to local disk
> # Before the driver can commit/kill this attempt, another attempt for the
> same partition (e.g., from speculation, executor preemption recovery, or
> retry after failure) also completes and writes its data
> # *Both attempts push their data to the push-based shuffle merger* — the
> merger accepts data from both since it has no knowledge of which attempt the
> driver will ultimately accept
> # The driver commits one attempt's {{MapStatus}} and kills the other, but
> *the merged block on the server now contains a mixture of data from both
> attempts*
> # Downstream reduce tasks read the merged block and silently consume
> *stale/corrupted data*
> This is especially problematic for
> *nondeterministic/stage-retry-indeterminate* stages where:
> * The output depends on non-deterministic functions (e.g., {{{}rand(){}}},
> {{{}UUID(){}}}, monotonically_increasing_id())
> * Input row ordering differs between attempts (e.g., HashJoin vs
> SortMergeJoin upstream)
> * The partitioning logic uses such non-deterministic values as keys
> In these cases, two attempts for the same map partition may produce
> {*}different shuffle partition assignments{*}, meaning the same logical row
> gets hashed to different reduce partitions in each attempt. When both sets of
> data end up in the same merged block, reducers receive {*}duplicated or
> misrouted rows{*}, leading to incorrect query results (e.g., more bytes read
> than written in shuffle metrics).
> h3. Example Scenario
> Consider a join with {{rand()}} in the join condition:
> {code:java}
> //sql
> SELECT * FROM t1 JOIN t2 ON t1.key = t2.key AND rand() < 0.1 {code}
> If this stage runs with:
> * {*}Speculation enabled{*}: Two attempts of the same map task run
> concurrently
> * {*}HashJoin as upstream{*}: Row ordering into {{rand()}} is
> non-deterministic
> * {*}Push-based shuffle enabled{*}: Both attempts push data to Celeborn/PSS
> Then:
> * Attempt 1 produces partition lengths: {{(27847, 24393, 26942, ...)}}
> * Attempt 2 produces *different* partition lengths: {{(28453, 27119, 30778,
> ...)}} because {{rand()}} received rows in a different order
> * Both push to the merger; the merged block contains interleaved data from
> both
> * The reducer reads *more data than was written by the committed attempt
> alone*
> * Query results contain *duplicate rows* or *data corruption*
> h3. Solution
> This fix introduces a three-layer defense mechanism:
> * Layer 1 -- Deferred push via TaskCompletionListener in
> ShuffleWriteProcessor: Shuffle block push is deferred until after the task
> succeeds (registered via TaskCompletionListener). If the task is killed or
> fails before completion, the push is skipped entirely. This prevents the
> majority of redundant pushes at the source by ensuring only committed
> attempts ever push data.
> * Layer 2 -- Stale mapId tracking on the driver side in TaskSetManager and
> MapOutputTracker: For cases where deferred push alone is insufficient (e.g.,
> two attempts both succeed within a race window), the driver tracks which
> mapIds represent stale or duplicate pushes. In
> TaskSetManager.handleSuccessfulTask, when a late-arriving or killed attempt's
> result arrives for an already-completed partition, its mapId is registered as
> stale via shuffleStatus.markDuplicatePushedMap(). The stale mapId set is
> serialized alongside MapStatus/MergeStatus in MapOutputTracker and propagated
> to all executors. New helper methods serializeStaleMapIds() and
> deserializeStaleMapIds() handle efficient binary serialization of this set.
> * Layer 3 -- Chunk-level stale detection on the reducer side in
> PushBasedFetchHelper: Before consuming any push-merged block, each reducer
> checks whether the server-side chunk bitmap contains any tracked stale mapId.
> The check merges all chunk bitmaps into a single RoaringBitmap and tests
> intersection with the stale mapId set using exists + contains. If any stale
> mapId is found, the reducer logs a warning and falls back to fetching
> individual unmerged blocks from the original map outputs -- the same fallback
> path used for merge failures or corrupted blocks. This check is applied to
> both remote merged blocks (in MergedBlocksMetaListener.onSuccess) and local
> merged blocks (in fetchPushMergedLocalBlock).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]