This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new a3fc15939 [CELEBORN-2009] Commit files request failure should exclude
worker in LifecycleManager
a3fc15939 is described below
commit a3fc159399c055555774f8b93ac9cedce1e699ba
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue May 27 20:16:51 2025 -0700
[CELEBORN-2009] Commit files request failure should exclude worker in
LifecycleManager
### What changes were proposed in this pull request?
Exclude worker in lifecycle manager if the commit files request on workers
fails with `COMMIT_FILE_EXCEPTION` or after multiple retries.
### Why are the changes needed?
If worker is under high load and not able to process request because of
high CPU, we should exclude it so it will not affect the next retry to shuffle
stage.
Internally, we are seeing commit file futures in worker under high load are
getting timed out and next retry of the stage is again picking same servers and
failing. Similarly, we are seeing continuous RpcTimeout for workers but those
workers are again getting selected for next retry.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
NA
Closes #3276 from s0nskar/worker_exlude_on_commit_exception.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit aeac31f6f062db838566c9cbc8649b48f97db298)
Signed-off-by: Wang, Fei <[email protected]>
---
.../scala/org/apache/celeborn/client/WorkerStatusTracker.scala | 2 ++
.../scala/org/apache/celeborn/client/commit/CommitHandler.scala | 9 +++++++++
.../org/apache/celeborn/common/protocol/message/StatusCode.java | 3 ++-
3 files changed, 13 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 82117a8bc..f065f2c3e 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -158,6 +158,8 @@ class WorkerStatusTracker(
case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
statusCode match {
case StatusCode.WORKER_UNKNOWN |
+ StatusCode.WORKER_UNRESPONSIVE |
+ StatusCode.COMMIT_FILE_EXCEPTION |
StatusCode.NO_AVAILABLE_WORKING_DIR |
StatusCode.RESERVE_SLOTS_FAILED |
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY |
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 87fb3564c..4a8b542f8 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -357,6 +357,9 @@ abstract class CommitHandler(
logError(
s"Request commitFiles return
${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
s"$shuffleKey for ${status.retriedTimes}/$maxRetries,
will not retry")
+ commitFilesFailedWorkers.put(
+ worker,
+ (StatusCode.COMMIT_FILES_MOCK_FAILURE,
System.currentTimeMillis()))
val res = createFailResponse(status)
processResponse(res, worker)
iter.remove()
@@ -377,6 +380,9 @@ abstract class CommitHandler(
s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleKey failed" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not
retry.",
e)
+ commitFilesFailedWorkers.put(
+ worker,
+ (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
val res = createFailResponse(status)
processResponse(res, status.workerInfo)
iter.remove()
@@ -392,6 +398,9 @@ abstract class CommitHandler(
logError(
s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleKey failed because of Timeout" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not
retry.")
+ commitFilesFailedWorkers.put(
+ worker,
+ (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
}
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 7c1df8d07..46c59ab52 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -91,7 +91,8 @@ public enum StatusCode {
OPEN_STREAM_FAILED(51),
SEGMENT_START_FAIL_REPLICA(52),
SEGMENT_START_FAIL_PRIMARY(53),
- NO_SPLIT(54);
+ NO_SPLIT(54),
+ WORKER_UNRESPONSIVE(55);
private final byte value;