jerqi commented on code in PR #1529:
URL:
https://github.com/apache/incubator-uniffle/pull/1529#discussion_r1492449549
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -518,6 +523,33 @@ public <K, V> ShuffleWriter<K, V> getWriter(
shuffleHandleInfo);
}
+ /**
+ * Provides a task attempt id that is unique for a shuffle stage.
+ *
+ * <p>We are not using context.taskAttemptId() here as this is a
monotonically increasing number
+ * that is unique across the entire Spark app which can reach very large
numbers, which can
+ * practically reach LONG.MAX_VALUE. That would overflow the bits in the
block id.
+ *
+ * <p>Here we use the map index or task id, appended by the attempt number
per task. The map index
+ * is limited by the number of partitions of a stage. The attempt number per
task is limited /
+ * configured by spark.task.maxFailures (default: 4).
+ *
+ * @return a task attempt id unique for a shuffle stage
+ */
+ @VisibleForTesting
+ protected static long getTaskAttemptId(int mapIndex, int attemptNo, int
maxFailures) {
+ int maxAttemptNo = maxFailures < 1 ? 0 : maxFailures - 1;
+ if (attemptNo > maxAttemptNo) {
Review Comment:
Maybe it's ok if we have this judgement. But we should consider the case
above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]