mridulm commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1676970873


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
     }
   }
 
+  public static int getMapAttemptNumber(TaskContext context) {
+    assert (context.stageAttemptNumber() < (1 << 15) && 
context.attemptNumber() < (1 << 16));
+    return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+  }
+

Review Comment:
   In the [test 
case](https://github.com/apache/celeborn/pull/2609/files#diff-e17f15fcca26ddfc412f0af159c784d72417b0f22598e1b1ebfcacd6d4c3ad35R301),
 it is not a failure due to fetch failure.
   
   The flow would be something like as follows:
   
   Stage 0 completes, generates shuffle data.
   Stage 1 (barrier stage) starts, tasks read output from stage 0 - all tasks, 
except partition 0, complete successfully.
   Stage 1, partition 0  attempt 0 throws exception - which results in spark 
re-running the entire stage 1; since it is a barrier stage.
   
   This means, every other partition in stage 1, which had successfully 
completed earlier, will also now be reexecuted - and so generate shuffle output 
for the exact same shuffle id + stage id. Even though stage attempt id is 
different, given this is a reexecution of the same stage, and is deterministic, 
celeborn will reuse the shuffle id even with `throwsFetchFailure` enabled - 
resulting in data duplication.
   
   [This is a quick 
prototype](https://github.com/apache/celeborn/compare/main...mridulm:celeborn:fix-barrier-stage-reexecution)
 to illustrate the fix - +CC @jiang13021



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to