jiang13021 commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1671572422
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
}
}
+ public static int getMapAttemptNumber(TaskContext context) {
+ assert (context.stageAttemptNumber() < (1 << 15) &&
context.attemptNumber() < (1 << 16));
+ return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+ }
+
Review Comment:
IMO, enabling `throwsFetchFailure` doesn't solve this problem. Image a
barrier stage which reads data from an external datasource and does shuffle
write. If one of its tasks(assuming it is "map 10") encounters an exception
while reading data, the barrier stage will be resubmitted. celeborn will reuse
the shuffleId if the stage is not marked as INDETERMINATE. In this case, we
will get 2 success MapperEnd(assuming they are "stage 0 stageAttempt 0 map 10
taskAttempt 0" and "stage 0 stageAttempt 1 map 10 taskAttempt 0") and we cannot
distinguish them for now.
I will try to communicate with the spark community to see if we can add
enforcement for the max attempt limit, such as changing the data type to Short,
although it might not be easy to succeed. For now, we can only check the max
attempt limit when initializing the SparkShuffleManager.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]