This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 47b37913c [CELEBORN-1763] Fix DataPusher be blocked for a long time
47b37913c is described below
commit 47b37913c31b9409f40168f3b6f70d15a9de182c
Author: zhangzhao.08 <[email protected]>
AuthorDate: Sun Dec 22 23:08:36 2024 -0800
[CELEBORN-1763] Fix DataPusher be blocked for a long time
### What changes were proposed in this pull request?
fix DataPusher be blocked for a long time
### Why are the changes needed?
The worker has been at a performance bottleneck for a long time, the slow
start strategy adjusts its maxInFlight to 1, which may cause RequestInFlight to
exceed maxInFlight. If the task’s main thread has been blocked in the
waitIdleQueueFullWithLock call, then the main thread will not be able to detect
the sending failure since this failure changes the exception in the push state,
and the waitIdleQueueFullWithLock function does not check for it
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
GA
Closes #2978 from zhaostu4/fix_pusher_block.
Authored-by: zhangzhao.08 <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 9e04ff4a9fe12473be10b043887206134fcc66fd)
Signed-off-by: Wang, Fei <[email protected]>
---
.../java/org/apache/celeborn/client/write/DataPusher.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index e366ae7c2..bc02570da 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -35,6 +35,8 @@ import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.common.util.Utils;
+import org.apache.celeborn.common.write.PushState;
public class DataPusher {
private static final Logger logger =
LoggerFactory.getLogger(DataPusher.class);
@@ -43,6 +45,7 @@ public class DataPusher {
private LinkedBlockingQueue<PushTask> idleQueue;
// partition -> PushTask Queue
+ private final PushState pushState;
private final DataPushQueue dataPushQueue;
private final ReentrantLock idleLock = new ReentrantLock();
private final Condition idleFull = idleLock.newCondition();
@@ -98,6 +101,8 @@ public class DataPusher {
this.client = client;
this.afterPush = afterPush;
this.mapStatusLengths = mapStatusLengths;
+ final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+ this.pushState = client.getPushState(mapKey);
pushThread =
ThreadUtils.newDaemonThread(
@@ -194,6 +199,9 @@ public class DataPusher {
if (exceptionRef.get() != null) {
throw exceptionRef.get();
}
+ if (pushState.exception.get() != null) {
+ throw pushState.exception.get();
+ }
}
protected void pushData(PushTask task) throws IOException {
@@ -217,6 +225,7 @@ public class DataPusher {
while (idleQueue != null
&& idleQueue.remainingCapacity() > 0
&& exceptionRef.get() == null
+ && pushState.exception.get() == null
&& (pushThread != null && pushThread.isAlive())) {
idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
}
@@ -229,7 +238,9 @@ public class DataPusher {
}
protected boolean stillRunning() {
- return !terminated && !Objects.nonNull(exceptionRef.get());
+ return !terminated
+ && !Objects.nonNull(exceptionRef.get())
+ && !Objects.nonNull(pushState.exception.get());
}
public DataPushQueue getDataPushQueue() {