jiang13021 commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1677274696
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
}
}
+ public static int getMapAttemptNumber(TaskContext context) {
+ assert (context.stageAttemptNumber() < (1 << 15) &&
context.attemptNumber() < (1 << 16));
+ return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+ }
+
Review Comment:
>IMO we can revisit change to attemptId after we support barrier stage
correctly - and evaluate if it is still required. WDYT ?
Agree, as I said in
https://issues.apache.org/jira/projects/CELEBORN/issues/CELEBORN-1498?filter=allopenissues,
maybe we can just modify the conditions for reusing shuffle id, let
DAGScheduler to decide whether to reuse shuffle id.
If DAGScheduler clears appShuffle's numAvailableOutputs, it means it wants
celeborn to ignore the previous results, so we generate a new shuffle id.
Otherwise, it means it wants to reuse the previous shuffle id and the previous
task's output.
There is one additional reason for doing this:
RDD#getOutputDeterministicLevel is a protected method of the RDD, which
introduces the risk of it being dynamically changed during execution. (Although
I've never seen it change). WDYT @mridulm @waitinfuture
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]