AngersZhuuuu commented on code in PR #3531:
URL: https://github.com/apache/celeborn/pull/3531#discussion_r2533588813


##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -460,14 +462,8 @@ public static boolean shouldReportShuffleFetchFailure(long 
taskId) {
                   taskId,
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
+              failedTaskAttempts += 1;

Review Comment:
   Should directly return true here



##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -477,22 +473,36 @@ public static boolean 
shouldReportShuffleFetchFailure(long taskId) {
                   taskId,
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
-              return false;
-            }
-          } else {
-            if (ti.attemptNumber() >= maxTaskFails - 1) {
-              LOG.warn(
-                  "StageId={} index={} taskId={} attemptNumber {} reach 
maxTaskFails {}.",
+              hasRunningAttempt = true;
+            } else if (ti.status() == "FAILED") {
+              LOG.info(
+                  "StageId={} index={} taskId={} attempt={} another attempt {} 
is failed.",
                   stageId,
                   taskInfo.index(),
                   taskId,
-                  ti.attemptNumber(),
-                  maxTaskFails);
-              return true;
+                  taskInfo.attemptNumber(),
+                  ti.attemptNumber());
+              failedTaskAttempts += 1;
             }
           }
         }
-        return true;
+        // 1. If no other taskAttempts are running, a FetchFailed exception 
should be thrown.
+        // 2. If other taskAttempts are running, but failedTaskAttempts >= 
maxTaskFails,
+        // a FetchFailed exception should be thrown.
+        if (!hasRunningAttempt) {
+          return true;
+        } else if (failedTaskAttempts >= maxTaskFails) {

Review Comment:
   First check `failedTaskAttempts >= maxTaskFails` then check 
`hasRunningAttempt `? keep same with spark behavior?



##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -477,22 +473,36 @@ public static boolean 
shouldReportShuffleFetchFailure(long taskId) {
                   taskId,
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
-              return false;
-            }
-          } else {
-            if (ti.attemptNumber() >= maxTaskFails - 1) {
-              LOG.warn(
-                  "StageId={} index={} taskId={} attemptNumber {} reach 
maxTaskFails {}.",
+              hasRunningAttempt = true;
+            } else if (ti.status() == "FAILED") {

Review Comment:
   It needs to define the same task status scope as Spark, such as KILLED or 
UNKNOWN.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to