This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new f8ad37655 [CELEBORN-2042] Fix FetchFailure handling when 
TaskSetManager is not found
f8ad37655 is described below

commit f8ad37655a9177b88405bf6fd5a2aefe9bc69bd9
Author: gaoyajun02 <[email protected]>
AuthorDate: Wed Jun 18 10:22:10 2025 -0700

    [CELEBORN-2042] Fix FetchFailure handling when TaskSetManager is not found
    
    ### What changes were proposed in this pull request?
    Fixes the FetchFailure handling logic in shouldReportShuffleFetchFailure 
method to properly handle cases where TaskSetManager cannot be found for a 
given task ID.
    
    ### Why are the changes needed?
    The current implementation incorrectly reports FetchFailure when 
TaskSetManager is not found, which leads to false positive failures in normal 
fault tolerance scenarios. This happens because:
    1. Executor Lost scenarios: When executors are lost due to resource 
preemption or failures, the associated TaskSetManager gets cleaned up, making 
it unavailable for lookup
    2. Stage cancellation: Cancelled or completed stages may have their 
TaskSetManager removed
    
    These are all normal scenarios in Spark's fault tolerance mechanism and 
should not be treated as shuffle failures. The current behavior can cause 
unnecessary job failures and confusion in debugging actual shuffle issues.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT, Long-running Production Validation
    
    Closes #3339 from gaoyajun02/CELEBORN-2042.
    
    Authored-by: gaoyajun02 <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 6a097944cf9b617636b402888e3f8de7eb39cd78)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../shuffle/celeborn/SparkShuffleManager.java      |  2 +-
 .../apache/spark/shuffle/celeborn/SparkUtils.java  | 35 +++++++++++++++-------
 .../shuffle/celeborn/SparkShuffleManager.java      |  2 +-
 .../apache/spark/shuffle/celeborn/SparkUtils.java  | 35 +++++++++++++++-------
 .../spark/shuffle/celeborn/SparkUtilsSuite.scala   |  4 +--
 5 files changed, 52 insertions(+), 26 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 563f00b91..02d48a4fb 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -104,7 +104,7 @@ public class SparkShuffleManager implements ShuffleManager {
                 (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
 
             lifecycleManager.registerReportTaskShuffleFetchFailurePreCheck(
-                taskId -> 
!SparkUtils.taskAnotherAttemptRunningOrSuccessful(taskId));
+                taskId -> SparkUtils.shouldReportShuffleFetchFailure(taskId));
             SparkUtils.addSparkListener(new 
ShuffleFetchFailureReportTaskCleanListener());
 
             lifecycleManager.registerShuffleTrackerCallback(
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 9629fabfa..a3a1add71 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -296,15 +296,25 @@ public class SparkUtils {
   protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
 
   /**
-   * Only used to check for the shuffle fetch failure task whether another 
attempt is running or
-   * successful. If another attempt(excluding the reported shuffle fetch 
failure tasks in current
-   * stage) is running or successful, return true. Otherwise, return false.
+   * Determines whether a shuffle fetch failure should be reported for the 
given task.
+   *
+   * <p>Returns false (should NOT report) in the following scenarios: - Other 
successful attempts
+   * for the same task - Other running attempts and the current failure count 
hasn't reached the
+   * maximum retry limit - Other attempts already reported shuffle fetch 
failures for the same task
+   * - TaskSetManager cannot be found (task completed/cleaned up, executor 
marked as failed, or
+   * stage cancelled/completed)
+   *
+   * <p>Returns true (should report) in all other cases, typically when: - No 
other attempts exist
+   * or all other attempts have failed - Current failure count has reached the 
maximum retry limit
+   *
+   * @param taskId the task ID to check
+   * @return true if the shuffle fetch failure should be reported, false 
otherwise
    */
-  public static boolean taskAnotherAttemptRunningOrSuccessful(long taskId) {
+  public static boolean shouldReportShuffleFetchFailure(long taskId) {
     SparkContext sparkContext = 
SparkContext$.MODULE$.getActive().getOrElse(null);
     if (sparkContext == null) {
       logger.error("Can not get active SparkContext.");
-      return false;
+      return true;
     }
     TaskSchedulerImpl taskScheduler = (TaskSchedulerImpl) 
sparkContext.taskScheduler();
     synchronized (taskScheduler) {
@@ -322,7 +332,7 @@ public class SparkUtils {
 
         Tuple2<TaskInfo, List<TaskInfo>> taskAttempts = 
getTaskAttempts(taskSetManager, taskId);
 
-        if (taskAttempts == null) return false;
+        if (taskAttempts == null) return true;
 
         TaskInfo taskInfo = taskAttempts._1();
         for (TaskInfo ti : taskAttempts._2()) {
@@ -343,7 +353,7 @@ public class SparkUtils {
                   taskId,
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
-              return true;
+              return false;
             } else if (ti.running()) {
               logger.info(
                   "StageId={} index={} taskId={} attempt={} another attempt {} 
is running.",
@@ -352,7 +362,7 @@ public class SparkUtils {
                   taskId,
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
-              return true;
+              return false;
             }
           } else {
             if (ti.attemptNumber() >= maxTaskFails - 1) {
@@ -363,13 +373,16 @@ public class SparkUtils {
                   taskId,
                   ti.attemptNumber(),
                   maxTaskFails);
-              return false;
+              return true;
             }
           }
         }
-        return false;
+        return true;
       } else {
-        logger.error("Can not get TaskSetManager for taskId: {}", taskId);
+        logger.error(
+            "Can not get TaskSetManager for taskId: {}, ignore it. (This 
typically occurs when: "
+                + " task completed/cleaned up, executor marked as failed, or 
stage cancelled/completed)",
+            taskId);
         return false;
       }
     }
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index b6555aa61..8a4318faf 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -148,7 +148,7 @@ public class SparkShuffleManager implements ShuffleManager {
                 (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
 
             lifecycleManager.registerReportTaskShuffleFetchFailurePreCheck(
-                taskId -> 
!SparkUtils.taskAnotherAttemptRunningOrSuccessful(taskId));
+                taskId -> SparkUtils.shouldReportShuffleFetchFailure(taskId));
             SparkUtils.addSparkListener(new 
ShuffleFetchFailureReportTaskCleanListener());
 
             lifecycleManager.registerShuffleTrackerCallback(
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index c5dfb81b1..82b3cf405 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -411,15 +411,25 @@ public class SparkUtils {
   protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
 
   /**
-   * Only used to check for the shuffle fetch failure task whether another 
attempt is running or
-   * successful. If another attempt(excluding the reported shuffle fetch 
failure tasks in current
-   * stage) is running or successful, return true. Otherwise, return false.
+   * Determines whether a shuffle fetch failure should be reported for the 
given task.
+   *
+   * <p>Returns false (should NOT report) in the following scenarios: - Other 
successful attempts
+   * for the same task - Other running attempts and the current failure count 
hasn't reached the
+   * maximum retry limit - Other attempts already reported shuffle fetch 
failures for the same task
+   * - TaskSetManager cannot be found (task completed/cleaned up, executor 
marked as failed, or
+   * stage cancelled/completed)
+   *
+   * <p>Returns true (should report) in all other cases, typically when: - No 
other attempts exist
+   * or all other attempts have failed - Current failure count has reached the 
maximum retry limit
+   *
+   * @param taskId the task ID to check
+   * @return true if the shuffle fetch failure should be reported, false 
otherwise
    */
-  public static boolean taskAnotherAttemptRunningOrSuccessful(long taskId) {
+  public static boolean shouldReportShuffleFetchFailure(long taskId) {
     SparkContext sparkContext = 
SparkContext$.MODULE$.getActive().getOrElse(null);
     if (sparkContext == null) {
       LOG.error("Can not get active SparkContext.");
-      return false;
+      return true;
     }
     TaskSchedulerImpl taskScheduler = (TaskSchedulerImpl) 
sparkContext.taskScheduler();
     synchronized (taskScheduler) {
@@ -437,7 +447,7 @@ public class SparkUtils {
 
         Tuple2<TaskInfo, List<TaskInfo>> taskAttempts = 
getTaskAttempts(taskSetManager, taskId);
 
-        if (taskAttempts == null) return false;
+        if (taskAttempts == null) return true;
 
         TaskInfo taskInfo = taskAttempts._1();
         for (TaskInfo ti : taskAttempts._2()) {
@@ -458,7 +468,7 @@ public class SparkUtils {
                   taskId,
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
-              return true;
+              return false;
             } else if (ti.running()) {
               LOG.info(
                   "StageId={} index={} taskId={} attempt={} another attempt {} 
is running.",
@@ -467,7 +477,7 @@ public class SparkUtils {
                   taskId,
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
-              return true;
+              return false;
             }
           } else {
             if (ti.attemptNumber() >= maxTaskFails - 1) {
@@ -478,13 +488,16 @@ public class SparkUtils {
                   taskId,
                   ti.attemptNumber(),
                   maxTaskFails);
-              return false;
+              return true;
             }
           }
         }
-        return false;
+        return true;
       } else {
-        LOG.error("Can not get TaskSetManager for taskId: {}", taskId);
+        LOG.error(
+            "Can not get TaskSetManager for taskId: {}, ignore it. (This 
typically occurs when: "
+                + " task completed/cleaned up, executor marked as failed, or 
stage cancelled/completed)",
+            taskId);
         return false;
       }
     }
diff --git 
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
index 58ca1b011..359b73c52 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
@@ -93,7 +93,7 @@ class SparkUtilsSuite extends AnyFunSuite
           val taskSetManager = SparkUtils.getTaskSetManager(taskScheduler, 
reportedTaskId)
           assert(taskSetManager != null)
           assert(SparkUtils.getTaskAttempts(taskSetManager, 
reportedTaskId)._2.size() == 1)
-          
assert(!SparkUtils.taskAnotherAttemptRunningOrSuccessful(reportedTaskId))
+          assert(SparkUtils.shouldReportShuffleFetchFailure(reportedTaskId))
         }
 
         sparkSession.sparkContext.cancelAllJobs()
@@ -145,7 +145,7 @@ class SparkUtilsSuite extends AnyFunSuite
         val taskSetManager = SparkUtils.getTaskSetManager(taskScheduler, 
taskId)
         assert(taskSetManager != null)
         assert(SparkUtils.getTaskAttempts(taskSetManager, taskId)._2.size() == 
1)
-        assert(!SparkUtils.taskAnotherAttemptRunningOrSuccessful(taskId))
+        assert(SparkUtils.shouldReportShuffleFetchFailure(taskId))
         assert(SparkUtils.reportedStageShuffleFetchFailureTaskIds.size() == 1)
       }
 

Reply via email to