This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new daed456ae [CELEBORN-1720][FOLLOWUP] Fix flakyTest - check if fetch 
failure task another attempt is running or successful
daed456ae is described below

commit daed456ae2c05c72ab14c2f9ea1286dd9a61869a
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Jun 6 11:11:12 2025 -0700

    [CELEBORN-1720][FOLLOWUP] Fix flakyTest - check if fetch failure task 
another attempt is running or successful
    
    ### What changes were proposed in this pull request?
    Record the last reported shuffle fetch failure task id.
    
    ### Why are the changes needed?
    Because the reported shuffle fetch failure task id might be cleaned up fast 
after recorded.
    
    To prevent flaky test, it is better to record the last reported task id for 
testing.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    GA for 3 times.
    
    Closes #3301 from turboFei/app_id_debug.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 60fa6d0ee786248b9f69c5c937f727884a4a3768)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 4 ++++
 .../src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 4 ++++
 .../scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala   | 6 +++---
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index b52758581..cff05deb2 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -292,6 +292,9 @@ public class SparkUtils {
     reportedStageShuffleFetchFailureTaskIds.remove(stageId + "-" + 
stageAttemptId);
   }
 
+  // For testing only.
+  protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
+
   /**
    * Only used to check for the shuffle fetch failure task whether another 
attempt is running or
    * successful. If another attempt(excluding the reported shuffle fetch 
failure tasks in current
@@ -315,6 +318,7 @@ public class SparkUtils {
             reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
                 stageUniqId, k -> new HashSet<>());
         reportedStageTaskIds.add(taskId);
+        lastReportedShuffleFetchFailureTaskId = taskId;
 
         Tuple2<TaskInfo, List<TaskInfo>> taskAttempts = 
getTaskAttempts(taskSetManager, taskId);
 
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index d6f4ebb73..4a5bfdbb7 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -407,6 +407,9 @@ public class SparkUtils {
     reportedStageShuffleFetchFailureTaskIds.remove(stageId + "-" + 
stageAttemptId);
   }
 
+  // For testing only.
+  protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
+
   /**
    * Only used to check for the shuffle fetch failure task whether another 
attempt is running or
    * successful. If another attempt(excluding the reported shuffle fetch 
failure tasks in current
@@ -430,6 +433,7 @@ public class SparkUtils {
             reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
                 stageUniqId, k -> new HashSet<>());
         reportedStageTaskIds.add(taskId);
+        lastReportedShuffleFetchFailureTaskId = taskId;
 
         Tuple2<TaskInfo, List<TaskInfo>> taskAttempts = 
getTaskAttempts(taskSetManager, taskId);
 
diff --git 
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
index 293ef080e..d4e57a47c 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
@@ -63,6 +63,7 @@ class SparkUtilsSuite extends AnyFunSuite
       val celebornConf = 
SparkUtils.fromSparkConf(sparkSession.sparkContext.getConf)
       val hook = new ShuffleReaderGetHooks(celebornConf, workerDirs)
       TestCelebornShuffleManager.registerReaderGetHook(hook)
+      SparkUtils.lastReportedShuffleFetchFailureTaskId = null
 
       try {
         val sc = sparkSession.sparkContext
@@ -87,9 +88,8 @@ class SparkUtilsSuite extends AnyFunSuite
         val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
         eventually(timeout(30.seconds), interval(0.milliseconds)) {
           assert(hook.executed.get() == true)
-          val reportedTaskId =
-            
SparkUtils.reportedStageShuffleFetchFailureTaskIds.values().asScala.flatMap(
-              _.asScala).head
+          val reportedTaskId = SparkUtils.lastReportedShuffleFetchFailureTaskId
+          assert(reportedTaskId != null)
           val taskSetManager = SparkUtils.getTaskSetManager(taskScheduler, 
reportedTaskId)
           assert(taskSetManager != null)
           assert(SparkUtils.getTaskAttempts(taskSetManager, 
reportedTaskId)._2.size() == 1)

Reply via email to