Copilot commented on code in PR #3556:
URL: https://github.com/apache/celeborn/pull/3556#discussion_r2594619417


##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -481,36 +517,31 @@ public static boolean 
shouldReportShuffleFetchFailure(long taskId) {
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
               hasRunningAttempt = true;
-            } else if ("FAILED".equals(ti.status()) || 
"UNKNOWN".equals(ti.status())) {
-              // For KILLED state task, Spark does not count the number of 
failures
-              // For UNKNOWN state task, Spark does count the number of 
failures
-              // For FAILED state task, Spark decides whether to count the 
failure based on the
-              // different failure reasons. Since we cannot obtain the failure
-              // reason here, we will count all tasks in FAILED state.
-              LOG.info(
-                  "StageId={} index={} taskId={} attempt={} another attempt {} 
status={}.",
-                  stageId,
-                  taskInfo.index(),
-                  taskId,
-                  taskInfo.attemptNumber(),
-                  ti.attemptNumber(),
-                  ti.status());
-              failedTaskAttempts += 1;
             }
           }
         }
         // The following situations should trigger a FetchFailed exception:
-        //  1. If failedTaskAttempts >= maxTaskFails
-        //  2. If no other taskAttempts are running
-        if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+        //  1. If total failures (previous failures + current failure) >= 
maxTaskFails
+        //  2. If no other taskAttempts are running, trigger a FetchFailed 
exception
+        //  to keep the same behavior as Spark.
+        // Note: previousFailureCount does NOT include the current failure,
+        //       so we compare with (maxTaskFails - 1) which is equivalent to
+        //       (previousFailureCount + 1) >= maxTaskFails
+        int previousFailureCount = getTaskFailureCount(taskSetManager, 
taskInfo.index());
+        if (previousFailureCount < 0) {
+          return true;
+        }
+        if (previousFailureCount + 1 >= maxTaskFails || !hasRunningAttempt) {

Review Comment:
   [nitpick] The comment mentions comparing with `(maxTaskFails - 1)` for 
equivalence, but the actual code uses `previousFailureCount + 1 >= 
maxTaskFails`. While mathematically equivalent, the implementation differs from 
the explanation. Consider either updating the comment to match the code or 
adjusting the code to match the comment explanation for consistency.



##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -481,36 +517,31 @@ public static boolean 
shouldReportShuffleFetchFailure(long taskId) {
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
               hasRunningAttempt = true;
-            } else if ("FAILED".equals(ti.status()) || 
"UNKNOWN".equals(ti.status())) {
-              // For KILLED state task, Spark does not count the number of 
failures
-              // For UNKNOWN state task, Spark does count the number of 
failures
-              // For FAILED state task, Spark decides whether to count the 
failure based on the
-              // different failure reasons. Since we cannot obtain the failure
-              // reason here, we will count all tasks in FAILED state.
-              LOG.info(
-                  "StageId={} index={} taskId={} attempt={} another attempt {} 
status={}.",
-                  stageId,
-                  taskInfo.index(),
-                  taskId,
-                  taskInfo.attemptNumber(),
-                  ti.attemptNumber(),
-                  ti.status());
-              failedTaskAttempts += 1;
             }
           }
         }
         // The following situations should trigger a FetchFailed exception:
-        //  1. If failedTaskAttempts >= maxTaskFails
-        //  2. If no other taskAttempts are running
-        if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+        //  1. If total failures (previous failures + current failure) >= 
maxTaskFails
+        //  2. If no other taskAttempts are running, trigger a FetchFailed 
exception
+        //  to keep the same behavior as Spark.
+        // Note: previousFailureCount does NOT include the current failure,
+        //       so we compare with (maxTaskFails - 1) which is equivalent to
+        //       (previousFailureCount + 1) >= maxTaskFails
+        int previousFailureCount = getTaskFailureCount(taskSetManager, 
taskInfo.index());
+        if (previousFailureCount < 0) {

Review Comment:
   The error handling for `previousFailureCount < 0` returns `true` to trigger 
FetchFailed, but lacks a comment explaining this fail-safe behavior. Consider 
adding a brief comment explaining that returning `true` when failure count 
cannot be determined is a conservative safety measure to trigger FetchFailed 
and prevent silent failures.
   ```suggestion
           if (previousFailureCount < 0) {
             // Fail-safe: If the previous failure count cannot be determined, 
conservatively trigger FetchFailed
             // to prevent silent failures and ensure the error is handled.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to