This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dc79a635 [CELEBORN-1983] Fix fetch fail not throw due to reach spark 
maxTaskFailures
5dc79a635 is described below

commit 5dc79a6353f9586c664a6c527db66e965d01e51c
Author: lijianfu03 <[email protected]>
AuthorDate: Sat Apr 26 18:37:40 2025 -0700

    [CELEBORN-1983] Fix fetch fail not throw due to reach spark maxTaskFailures
    
    ### What changes were proposed in this pull request?
    change sparkUtils  taskAnotherAttemptRunningOrSuccessful method
    (https://issues.apache.org/jira/browse/CELEBORN-1983)
    
    ### Why are the changes needed?
    
    Because it will lead to job failure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    Test it on online ETL jobs
    
    Closes #3230 from buska88/CELEBORN-1983.
    
    Authored-by: lijianfu03 <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../java/org/apache/spark/shuffle/celeborn/SparkUtils.java   | 12 ++++++++++++
 .../java/org/apache/spark/shuffle/celeborn/SparkUtils.java   | 12 ++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index c708c1e84..44dd1ea28 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -302,6 +302,7 @@ public class SparkUtils {
       if (taskSetManager != null) {
         int stageId = taskSetManager.stageId();
         int stageAttemptId = taskSetManager.taskSet().stageAttemptId();
+        int maxTaskFails = taskSetManager.maxTaskFailures();
         String stageUniqId = stageId + "-" + stageAttemptId;
         Set<Long> reportedStageTaskIds =
             reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
@@ -342,6 +343,17 @@ public class SparkUtils {
                   ti.attemptNumber());
               return true;
             }
+          } else {
+            if (ti.attemptNumber() >= maxTaskFails - 1) {
+              logger.warn(
+                  "StageId={} index={} taskId={} attemptNumber {} reach 
maxTaskFails {}.",
+                  stageId,
+                  taskInfo.index(),
+                  taskId,
+                  ti.attemptNumber(),
+                  maxTaskFails);
+              return false;
+            }
           }
         }
         return false;
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 4efc29a90..edaeb28bd 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -419,6 +419,7 @@ public class SparkUtils {
       if (taskSetManager != null) {
         int stageId = taskSetManager.stageId();
         int stageAttemptId = taskSetManager.taskSet().stageAttemptId();
+        int maxTaskFails = taskSetManager.maxTaskFailures();
         String stageUniqId = stageId + "-" + stageAttemptId;
         Set<Long> reportedStageTaskIds =
             reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
@@ -459,6 +460,17 @@ public class SparkUtils {
                   ti.attemptNumber());
               return true;
             }
+          } else {
+            if (ti.attemptNumber() >= maxTaskFails - 1) {
+              LOG.warn(
+                  "StageId={} index={} taskId={} attemptNumber {} reach 
maxTaskFails {}.",
+                  stageId,
+                  taskInfo.index(),
+                  taskId,
+                  ti.attemptNumber(),
+                  maxTaskFails);
+              return false;
+            }
           }
         }
         return false;

Reply via email to