mridulm commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1699264136
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
}
}
+ public static int getMapAttemptNumber(TaskContext context) {
+ assert (context.stageAttemptNumber() < (1 << 15) &&
context.attemptNumber() < (1 << 16));
+ return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+ }
+
Review Comment:
I was looking at this more (and again, great test @RexXiong !), and
unfortunately it looks like we dont have a better way to fix this than by
changing what is `attemptNumber`.
Ideally, I would preferred if this was `taskId` - but given that would be a
breaking protocol (and we would want to backport this to 0.5 imo), it would be
practical to do this with the computation that @jiang13021 has proposed here,
Given this, let me revert the changes in #2639 and keep it limited to
barrier stage - and current PR (#2609) can fix uniquely identifying a task
output.
With this, essentially we have three cases:
* Indeterminate stages - which always throw away shuffle data for parent
when a fetch failed is raised (and increments shuffle id).
* Barrier stages - which always throw away shuffle data when current stage
encounters any task failure (and increments shuffle id).
* Determinate stage - which will recompute only lost shuffle partitions for
parent stage.
In all cases, we ensure that `(shuffleId, mapId, attemptNumber)` is always
unique within a spark application (irrespective of recomputations, etc).
Thoughts @waitinfuture, @RexXiong, @jiang13021
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]