This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new bb5a4d21 [CELEBORN-63] Add CONGESTION related status codes (#1028)
bb5a4d21 is described below
commit bb5a4d2180774b58b853bcb83ba316223b514237
Author: RexAn <[email protected]>
AuthorDate: Thu Dec 1 10:55:37 2022 +0800
[CELEBORN-63] Add CONGESTION related status codes (#1028)
* Increase push data return reason types such as CONGESTION ect
---
.../src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 3 +++
.../org/apache/celeborn/common/protocol/message/StatusCode.java | 6 +++++-
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala | 4 ++++
3 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 332e8f80..949ade78 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -591,6 +591,8 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void onSuccess(ByteBuffer response) {
pushState.inFlightBatches.remove(nextBatchId);
+ // TODO Need to adjust maxReqsInFlight if server response is
congested, see
+ // CELEBORN-62
if (response.remaining() > 0 && response.get() ==
StatusCode.STAGE_ENDED.getValue()) {
mapperEndMap
.computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet())
@@ -895,6 +897,7 @@ public class ShuffleClientImpl extends ShuffleClient {
attemptId,
groupedBatchId);
pushState.inFlightBatches.remove(groupedBatchId);
+ // TODO Need to adjust maxReqsInFlight if server response is
congested, see CELEBORN-62
if (response.remaining() > 0 && response.get() ==
StatusCode.STAGE_ENDED.getValue()) {
mapperEndMap
.computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet())
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 8a507e43..4e6e8d8a 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -55,7 +55,11 @@ public enum StatusCode {
WORKER_IN_BLACKLIST(27),
UNKNOWN_WORKER(28),
- COMMIT_FILE_EXCEPTION(29);
+ COMMIT_FILE_EXCEPTION(29),
+
+ // Rate limit statuses
+ PUSH_DATA_SUCCESS_MASTER_CONGESTED(30),
+ PUSH_DATA_SUCCESS_SLAVE_CONGESTED(31);
private final byte value;
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 3e970cf0..592351c9 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -890,6 +890,10 @@ object Utils extends Logging {
StatusCode.WORKER_IN_BLACKLIST
case 28 =>
StatusCode.UNKNOWN_WORKER
+ case 30 =>
+ StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED
+ case 31 =>
+ StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED
case _ =>
null
}