jiang13021 commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1671572422


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
     }
   }
 
+  public static int getMapAttemptNumber(TaskContext context) {
+    assert (context.stageAttemptNumber() < (1 << 15) && 
context.attemptNumber() < (1 << 16));
+    return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+  }
+

Review Comment:
   IMO, enabling `throwsFetchFailure`  doesn't solve this problem. Image a 
barrier stage which reads data from an external datasource and does shuffle 
write.  If one of its tasks(assuming it is "map 10") encounters an exception 
while reading data, the barrier stage will be resubmitted. celeborn will reuse 
the shuffleId if the stage is not marked as INDETERMINATE. In this case, we 
will get 2 success MapperEnd(assuming they are "stage 0 stageAttempt 0 map 10 
taskAttempt 0" and "stage 0 stageAttempt 1 map 10 taskAttempt 0") and we cannot 
distinguish them for now.
   
   I will try to communicate with the spark community to see if we can add 
enforcement for the max attempt limit, such as changing the data type to Short, 
although it might not be easy to succeed. For now, we can only check the max 
attempt limit when initializing the SparkShuffleManager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to