This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8875f20e7 [CELEBORN-1361] MaxInFlightPerWorker should use the value
provided by PushStrategy
8875f20e7 is described below
commit 8875f20e727d107a6c65556935029ac0283236be
Author: mcdull-zhang <[email protected]>
AuthorDate: Thu May 16 19:47:04 2024 +0800
[CELEBORN-1361] MaxInFlightPerWorker should use the value provided by
PushStrategy
### What changes were proposed in this pull request?
The data push thread should first send requests to workers that are not
under pressure.
Use PushStrategy's `currentMaxReqsInFlight` to better filter requests.
### Why are the changes needed?
Prevent blocking other requests
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2432 from mcdull-zhang/CELEBORN-1361.
Authored-by: mcdull-zhang <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../main/java/org/apache/celeborn/client/write/DataPushQueue.java | 4 +---
.../org/apache/celeborn/common/write/InFlightRequestTracker.java | 5 +++++
common/src/main/java/org/apache/celeborn/common/write/PushState.java | 4 ++--
3 files changed, 8 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
index 84fb1ab81..aec6a69b5 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
@@ -46,7 +46,6 @@ public class DataPushQueue {
private final LinkedBlockingQueue<PushTask> workingQueue;
private final PushState pushState;
private final DataPusher dataPusher;
- private final int maxInFlightPerWorker;
private final int shuffleId;
private final int numMappers;
private final int numPartitions;
@@ -70,7 +69,6 @@ public class DataPushQueue {
this.dataPusher = dataPusher;
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
this.pushState = client.getPushState(mapKey);
- this.maxInFlightPerWorker = conf.clientPushMaxReqsInFlightPerWorker();
this.takeTaskWaitIntervalMs = conf.clientPushTakeTaskWaitIntervalMs();
this.takeTaskMaxWaitAttempts = conf.clientPushTakeTaskMaxWaitAttempts();
final int capacity = conf.clientPushQueueCapacity();
@@ -106,7 +104,7 @@ public class DataPushQueue {
if (loc != null) {
Integer oldCapacity = workerCapacity.get(loc.hostAndPushPort());
if (oldCapacity == null) {
- oldCapacity = maxInFlightPerWorker -
pushState.inflightPushes(loc.hostAndPushPort());
+ oldCapacity =
pushState.remainingAllowPushes(loc.hostAndPushPort());
workerCapacity.put(loc.hostAndPushPort(), oldCapacity);
}
workerWaitAttempts.putIfAbsent(loc.hostAndPushPort(), new
AtomicInteger(0));
diff --git
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
index c85cfbc8d..6e11aa501 100644
---
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
+++
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
@@ -180,6 +180,11 @@ public class InFlightRequestTracker {
return times <= 0;
}
+ public int remainingAllowPushes(String hostAndPushPort) {
+ return pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort)
+ - getBatchIdSetByAddressPair(hostAndPushPort).size();
+ }
+
protected int nextBatchId() {
return batchId.incrementAndGet();
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/write/PushState.java
b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
index 699e4656c..3979cafd6 100644
--- a/common/src/main/java/org/apache/celeborn/common/write/PushState.java
+++ b/common/src/main/java/org/apache/celeborn/common/write/PushState.java
@@ -85,7 +85,7 @@ public class PushState {
return inFlightRequestTracker.limitZeroInFlight();
}
- public int inflightPushes(String hostAndPushPort) {
- return
inFlightRequestTracker.getBatchIdSetByAddressPair(hostAndPushPort).size();
+ public int remainingAllowPushes(String hostAndPushPort) {
+ return inFlightRequestTracker.remainingAllowPushes(hostAndPushPort);
}
}