This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 60fa6d0ee [CELEBORN-1720][FOLLOWUP] Fix flakyTest - check if fetch
failure task another attempt is running or successful
60fa6d0ee is described below
commit 60fa6d0ee786248b9f69c5c937f727884a4a3768
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Jun 6 11:11:12 2025 -0700
[CELEBORN-1720][FOLLOWUP] Fix flakyTest - check if fetch failure task
another attempt is running or successful
### What changes were proposed in this pull request?
Record the last reported shuffle fetch failure task id.
### Why are the changes needed?
Because the reported shuffle fetch failure task id might be cleaned up fast
after recorded.
To prevent flaky test, it is better to record the last reported task id for
testing.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA for 3 times.
Closes #3301 from turboFei/app_id_debug.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 4 ++++
.../src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 4 ++++
.../scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala | 6 +++---
3 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index b52758581..cff05deb2 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -292,6 +292,9 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.remove(stageId + "-" +
stageAttemptId);
}
+ // For testing only.
+ protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
+
/**
* Only used to check for the shuffle fetch failure task whether another
attempt is running or
* successful. If another attempt(excluding the reported shuffle fetch
failure tasks in current
@@ -315,6 +318,7 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
stageUniqId, k -> new HashSet<>());
reportedStageTaskIds.add(taskId);
+ lastReportedShuffleFetchFailureTaskId = taskId;
Tuple2<TaskInfo, List<TaskInfo>> taskAttempts =
getTaskAttempts(taskSetManager, taskId);
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index d6f4ebb73..4a5bfdbb7 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -407,6 +407,9 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.remove(stageId + "-" +
stageAttemptId);
}
+ // For testing only.
+ protected static volatile Long lastReportedShuffleFetchFailureTaskId = null;
+
/**
* Only used to check for the shuffle fetch failure task whether another
attempt is running or
* successful. If another attempt(excluding the reported shuffle fetch
failure tasks in current
@@ -430,6 +433,7 @@ public class SparkUtils {
reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
stageUniqId, k -> new HashSet<>());
reportedStageTaskIds.add(taskId);
+ lastReportedShuffleFetchFailureTaskId = taskId;
Tuple2<TaskInfo, List<TaskInfo>> taskAttempts =
getTaskAttempts(taskSetManager, taskId);
diff --git
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
index 293ef080e..d4e57a47c 100644
---
a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala
@@ -63,6 +63,7 @@ class SparkUtilsSuite extends AnyFunSuite
val celebornConf =
SparkUtils.fromSparkConf(sparkSession.sparkContext.getConf)
val hook = new ShuffleReaderGetHooks(celebornConf, workerDirs)
TestCelebornShuffleManager.registerReaderGetHook(hook)
+ SparkUtils.lastReportedShuffleFetchFailureTaskId = null
try {
val sc = sparkSession.sparkContext
@@ -87,9 +88,8 @@ class SparkUtilsSuite extends AnyFunSuite
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
eventually(timeout(30.seconds), interval(0.milliseconds)) {
assert(hook.executed.get() == true)
- val reportedTaskId =
-
SparkUtils.reportedStageShuffleFetchFailureTaskIds.values().asScala.flatMap(
- _.asScala).head
+ val reportedTaskId = SparkUtils.lastReportedShuffleFetchFailureTaskId
+ assert(reportedTaskId != null)
val taskSetManager = SparkUtils.getTaskSetManager(taskScheduler,
reportedTaskId)
assert(taskSetManager != null)
assert(SparkUtils.getTaskAttempts(taskSetManager,
reportedTaskId)._2.size() == 1)