This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new a3fc15939 [CELEBORN-2009] Commit files request failure should exclude 
worker in LifecycleManager
a3fc15939 is described below

commit a3fc159399c055555774f8b93ac9cedce1e699ba
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue May 27 20:16:51 2025 -0700

    [CELEBORN-2009] Commit files request failure should exclude worker in 
LifecycleManager
    
    ### What changes were proposed in this pull request?
    
    Exclude worker in lifecycle manager if the commit files request on workers 
fails with `COMMIT_FILE_EXCEPTION` or after multiple retries.
    
    ### Why are the changes needed?
    
    If worker is under high load and not able to process request because of 
high CPU, we should exclude it so it will not affect the next retry to shuffle 
stage.
    
    Internally, we are seeing commit file futures in worker under high load are 
getting timed out and next retry of the stage is again picking same servers and 
failing. Similarly, we are seeing continuous RpcTimeout for workers but those 
workers are again getting selected for next retry.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    NA
    
    Closes #3276 from s0nskar/worker_exlude_on_commit_exception.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit aeac31f6f062db838566c9cbc8649b48f97db298)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../scala/org/apache/celeborn/client/WorkerStatusTracker.scala   | 2 ++
 .../scala/org/apache/celeborn/client/commit/CommitHandler.scala  | 9 +++++++++
 .../org/apache/celeborn/common/protocol/message/StatusCode.java  | 3 ++-
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 82117a8bc..f065f2c3e 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -158,6 +158,8 @@ class WorkerStatusTracker(
         case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
           statusCode match {
             case StatusCode.WORKER_UNKNOWN |
+                StatusCode.WORKER_UNRESPONSIVE |
+                StatusCode.COMMIT_FILE_EXCEPTION |
                 StatusCode.NO_AVAILABLE_WORKING_DIR |
                 StatusCode.RESERVE_SLOTS_FAILED |
                 StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY |
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 87fb3564c..4a8b542f8 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -357,6 +357,9 @@ abstract class CommitHandler(
                     logError(
                       s"Request commitFiles return 
${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
                         s"$shuffleKey for ${status.retriedTimes}/$maxRetries, 
will not retry")
+                    commitFilesFailedWorkers.put(
+                      worker,
+                      (StatusCode.COMMIT_FILES_MOCK_FAILURE, 
System.currentTimeMillis()))
                     val res = createFailResponse(status)
                     processResponse(res, worker)
                     iter.remove()
@@ -377,6 +380,9 @@ abstract class CommitHandler(
                   s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleKey failed" +
                     s" (attempt ${status.retriedTimes}/$maxRetries), will not 
retry.",
                   e)
+                commitFilesFailedWorkers.put(
+                  worker,
+                  (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
                 val res = createFailResponse(status)
                 processResponse(res, status.workerInfo)
                 iter.remove()
@@ -392,6 +398,9 @@ abstract class CommitHandler(
             logError(
               s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleKey failed because of Timeout" +
                 s" (attempt ${status.retriedTimes}/$maxRetries), will not 
retry.")
+            commitFilesFailedWorkers.put(
+              worker,
+              (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
           }
         }
       }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
 
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 7c1df8d07..46c59ab52 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -91,7 +91,8 @@ public enum StatusCode {
   OPEN_STREAM_FAILED(51),
   SEGMENT_START_FAIL_REPLICA(52),
   SEGMENT_START_FAIL_PRIMARY(53),
-  NO_SPLIT(54);
+  NO_SPLIT(54),
+  WORKER_UNRESPONSIVE(55);
 
   private final byte value;
 

Reply via email to