Copilot commented on code in PR #3556:
URL: https://github.com/apache/celeborn/pull/3556#discussion_r2594619417
##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -481,36 +517,31 @@ public static boolean
shouldReportShuffleFetchFailure(long taskId) {
taskInfo.attemptNumber(),
ti.attemptNumber());
hasRunningAttempt = true;
- } else if ("FAILED".equals(ti.status()) ||
"UNKNOWN".equals(ti.status())) {
- // For KILLED state task, Spark does not count the number of
failures
- // For UNKNOWN state task, Spark does count the number of
failures
- // For FAILED state task, Spark decides whether to count the
failure based on the
- // different failure reasons. Since we cannot obtain the failure
- // reason here, we will count all tasks in FAILED state.
- LOG.info(
- "StageId={} index={} taskId={} attempt={} another attempt {}
status={}.",
- stageId,
- taskInfo.index(),
- taskId,
- taskInfo.attemptNumber(),
- ti.attemptNumber(),
- ti.status());
- failedTaskAttempts += 1;
}
}
}
// The following situations should trigger a FetchFailed exception:
- // 1. If failedTaskAttempts >= maxTaskFails
- // 2. If no other taskAttempts are running
- if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+ // 1. If total failures (previous failures + current failure) >=
maxTaskFails
+ // 2. If no other taskAttempts are running, trigger a FetchFailed
exception
+ // to keep the same behavior as Spark.
+ // Note: previousFailureCount does NOT include the current failure,
+ // so we compare with (maxTaskFails - 1) which is equivalent to
+ // (previousFailureCount + 1) >= maxTaskFails
+ int previousFailureCount = getTaskFailureCount(taskSetManager,
taskInfo.index());
+ if (previousFailureCount < 0) {
+ return true;
+ }
+ if (previousFailureCount + 1 >= maxTaskFails || !hasRunningAttempt) {
Review Comment:
[nitpick] The comment mentions comparing with `(maxTaskFails - 1)` for
equivalence, but the actual code uses `previousFailureCount + 1 >=
maxTaskFails`. While mathematically equivalent, the implementation differs from
the explanation. Consider either updating the comment to match the code or
adjusting the code to match the comment explanation for consistency.
##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -481,36 +517,31 @@ public static boolean
shouldReportShuffleFetchFailure(long taskId) {
taskInfo.attemptNumber(),
ti.attemptNumber());
hasRunningAttempt = true;
- } else if ("FAILED".equals(ti.status()) ||
"UNKNOWN".equals(ti.status())) {
- // For KILLED state task, Spark does not count the number of
failures
- // For UNKNOWN state task, Spark does count the number of
failures
- // For FAILED state task, Spark decides whether to count the
failure based on the
- // different failure reasons. Since we cannot obtain the failure
- // reason here, we will count all tasks in FAILED state.
- LOG.info(
- "StageId={} index={} taskId={} attempt={} another attempt {}
status={}.",
- stageId,
- taskInfo.index(),
- taskId,
- taskInfo.attemptNumber(),
- ti.attemptNumber(),
- ti.status());
- failedTaskAttempts += 1;
}
}
}
// The following situations should trigger a FetchFailed exception:
- // 1. If failedTaskAttempts >= maxTaskFails
- // 2. If no other taskAttempts are running
- if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+ // 1. If total failures (previous failures + current failure) >=
maxTaskFails
+ // 2. If no other taskAttempts are running, trigger a FetchFailed
exception
+ // to keep the same behavior as Spark.
+ // Note: previousFailureCount does NOT include the current failure,
+ // so we compare with (maxTaskFails - 1) which is equivalent to
+ // (previousFailureCount + 1) >= maxTaskFails
+ int previousFailureCount = getTaskFailureCount(taskSetManager,
taskInfo.index());
+ if (previousFailureCount < 0) {
Review Comment:
The error handling for `previousFailureCount < 0` returns `true` to trigger
FetchFailed, but lacks a comment explaining this fail-safe behavior. Consider
adding a brief comment explaining that returning `true` when failure count
cannot be determined is a conservative safety measure to trigger FetchFailed
and prevent silent failures.
```suggestion
if (previousFailureCount < 0) {
// Fail-safe: If the previous failure count cannot be determined,
conservatively trigger FetchFailed
// to prevent silent failures and ensure the error is handled.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]