This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5dc79a635 [CELEBORN-1983] Fix fetch fail not throw due to reach spark
maxTaskFailures
5dc79a635 is described below
commit 5dc79a6353f9586c664a6c527db66e965d01e51c
Author: lijianfu03 <[email protected]>
AuthorDate: Sat Apr 26 18:37:40 2025 -0700
[CELEBORN-1983] Fix fetch fail not throw due to reach spark maxTaskFailures
### What changes were proposed in this pull request?
change sparkUtils taskAnotherAttemptRunningOrSuccessful method
(https://issues.apache.org/jira/browse/CELEBORN-1983)
### Why are the changes needed?
Because it will lead to job failure.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test it on online ETL jobs
Closes #3230 from buska88/CELEBORN-1983.
Authored-by: lijianfu03 <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 12 ++++++++++++
.../java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 12 ++++++++++++
2 files changed, 24 insertions(+)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index c708c1e84..44dd1ea28 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -302,6 +302,7 @@ public class SparkUtils {
if (taskSetManager != null) {
int stageId = taskSetManager.stageId();
int stageAttemptId = taskSetManager.taskSet().stageAttemptId();
+ int maxTaskFails = taskSetManager.maxTaskFailures();
String stageUniqId = stageId + "-" + stageAttemptId;
Set<Long> reportedStageTaskIds =
reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
@@ -342,6 +343,17 @@ public class SparkUtils {
ti.attemptNumber());
return true;
}
+ } else {
+ if (ti.attemptNumber() >= maxTaskFails - 1) {
+ logger.warn(
+ "StageId={} index={} taskId={} attemptNumber {} reach
maxTaskFails {}.",
+ stageId,
+ taskInfo.index(),
+ taskId,
+ ti.attemptNumber(),
+ maxTaskFails);
+ return false;
+ }
}
}
return false;
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 4efc29a90..edaeb28bd 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -419,6 +419,7 @@ public class SparkUtils {
if (taskSetManager != null) {
int stageId = taskSetManager.stageId();
int stageAttemptId = taskSetManager.taskSet().stageAttemptId();
+ int maxTaskFails = taskSetManager.maxTaskFailures();
String stageUniqId = stageId + "-" + stageAttemptId;
Set<Long> reportedStageTaskIds =
reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(
@@ -459,6 +460,17 @@ public class SparkUtils {
ti.attemptNumber());
return true;
}
+ } else {
+ if (ti.attemptNumber() >= maxTaskFails - 1) {
+ LOG.warn(
+ "StageId={} index={} taskId={} attemptNumber {} reach
maxTaskFails {}.",
+ stageId,
+ taskInfo.index(),
+ taskId,
+ ti.attemptNumber(),
+ maxTaskFails);
+ return false;
+ }
}
}
return false;