This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new bb5a4d21 [CELEBORN-63] Add CONGESTION related status codes (#1028)
bb5a4d21 is described below

commit bb5a4d2180774b58b853bcb83ba316223b514237
Author: RexAn <[email protected]>
AuthorDate: Thu Dec 1 10:55:37 2022 +0800

    [CELEBORN-63] Add CONGESTION related status codes (#1028)
    
    * Increase push data return reason types such as CONGESTION ect
---
 .../src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 3 +++
 .../org/apache/celeborn/common/protocol/message/StatusCode.java     | 6 +++++-
 common/src/main/scala/org/apache/celeborn/common/util/Utils.scala   | 4 ++++
 3 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 332e8f80..949ade78 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -591,6 +591,8 @@ public class ShuffleClientImpl extends ShuffleClient {
             @Override
             public void onSuccess(ByteBuffer response) {
               pushState.inFlightBatches.remove(nextBatchId);
+              // TODO Need to adjust maxReqsInFlight if server response is 
congested, see
+              // CELEBORN-62
               if (response.remaining() > 0 && response.get() == 
StatusCode.STAGE_ENDED.getValue()) {
                 mapperEndMap
                     .computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet())
@@ -895,6 +897,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                 attemptId,
                 groupedBatchId);
             pushState.inFlightBatches.remove(groupedBatchId);
+            // TODO Need to adjust maxReqsInFlight if server response is 
congested, see CELEBORN-62
             if (response.remaining() > 0 && response.get() == 
StatusCode.STAGE_ENDED.getValue()) {
               mapperEndMap
                   .computeIfAbsent(shuffleId, (id) -> 
ConcurrentHashMap.newKeySet())
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
 
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 8a507e43..4e6e8d8a 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -55,7 +55,11 @@ public enum StatusCode {
   WORKER_IN_BLACKLIST(27),
   UNKNOWN_WORKER(28),
 
-  COMMIT_FILE_EXCEPTION(29);
+  COMMIT_FILE_EXCEPTION(29),
+
+  // Rate limit statuses
+  PUSH_DATA_SUCCESS_MASTER_CONGESTED(30),
+  PUSH_DATA_SUCCESS_SLAVE_CONGESTED(31);
 
   private final byte value;
 
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 3e970cf0..592351c9 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -890,6 +890,10 @@ object Utils extends Logging {
         StatusCode.WORKER_IN_BLACKLIST
       case 28 =>
         StatusCode.UNKNOWN_WORKER
+      case 30 =>
+        StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED
+      case 31 =>
+        StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED
       case _ =>
         null
     }

Reply via email to