This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8875f20e7 [CELEBORN-1361] MaxInFlightPerWorker should use the value 
provided by PushStrategy
8875f20e7 is described below

commit 8875f20e727d107a6c65556935029ac0283236be
Author: mcdull-zhang <[email protected]>
AuthorDate: Thu May 16 19:47:04 2024 +0800

    [CELEBORN-1361] MaxInFlightPerWorker should use the value provided by 
PushStrategy
    
    ### What changes were proposed in this pull request?
    The data push thread should first send requests to workers that are not 
under pressure.
    Use PushStrategy's `currentMaxReqsInFlight` to better filter requests.
    
    ### Why are the changes needed?
    Prevent blocking other requests
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2432 from mcdull-zhang/CELEBORN-1361.
    
    Authored-by: mcdull-zhang <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../main/java/org/apache/celeborn/client/write/DataPushQueue.java    | 4 +---
 .../org/apache/celeborn/common/write/InFlightRequestTracker.java     | 5 +++++
 common/src/main/java/org/apache/celeborn/common/write/PushState.java | 4 ++--
 3 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java 
b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
index 84fb1ab81..aec6a69b5 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
@@ -46,7 +46,6 @@ public class DataPushQueue {
   private final LinkedBlockingQueue<PushTask> workingQueue;
   private final PushState pushState;
   private final DataPusher dataPusher;
-  private final int maxInFlightPerWorker;
   private final int shuffleId;
   private final int numMappers;
   private final int numPartitions;
@@ -70,7 +69,6 @@ public class DataPushQueue {
     this.dataPusher = dataPusher;
     final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
     this.pushState = client.getPushState(mapKey);
-    this.maxInFlightPerWorker = conf.clientPushMaxReqsInFlightPerWorker();
     this.takeTaskWaitIntervalMs = conf.clientPushTakeTaskWaitIntervalMs();
     this.takeTaskMaxWaitAttempts = conf.clientPushTakeTaskMaxWaitAttempts();
     final int capacity = conf.clientPushQueueCapacity();
@@ -106,7 +104,7 @@ public class DataPushQueue {
           if (loc != null) {
             Integer oldCapacity = workerCapacity.get(loc.hostAndPushPort());
             if (oldCapacity == null) {
-              oldCapacity = maxInFlightPerWorker - 
pushState.inflightPushes(loc.hostAndPushPort());
+              oldCapacity = 
pushState.remainingAllowPushes(loc.hostAndPushPort());
               workerCapacity.put(loc.hostAndPushPort(), oldCapacity);
             }
             workerWaitAttempts.putIfAbsent(loc.hostAndPushPort(), new 
AtomicInteger(0));
diff --git 
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
 
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
index c85cfbc8d..6e11aa501 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
@@ -180,6 +180,11 @@ public class InFlightRequestTracker {
     return times <= 0;
   }
 
+  public int remainingAllowPushes(String hostAndPushPort) {
+    return pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort)
+        - getBatchIdSetByAddressPair(hostAndPushPort).size();
+  }
+
   protected int nextBatchId() {
     return batchId.incrementAndGet();
   }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/write/PushState.java 
b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
index 699e4656c..3979cafd6 100644
--- a/common/src/main/java/org/apache/celeborn/common/write/PushState.java
+++ b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
@@ -85,7 +85,7 @@ public class PushState {
     return inFlightRequestTracker.limitZeroInFlight();
   }
 
-  public int inflightPushes(String hostAndPushPort) {
-    return 
inFlightRequestTracker.getBatchIdSetByAddressPair(hostAndPushPort).size();
+  public int remainingAllowPushes(String hostAndPushPort) {
+    return inFlightRequestTracker.remainingAllowPushes(hostAndPushPort);
   }
 }

Reply via email to