This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 9462e479c [CELEBORN-640][WORKER] DataPushQueue should not keep waiting
take tasks
9462e479c is described below
commit 9462e479c21a3e013257d08c2ffc14d68a26e07c
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 9 14:06:47 2023 +0800
[CELEBORN-640][WORKER] DataPushQueue should not keep waiting take tasks
### What changes were proposed in this pull request?
In our prod meet many times of push queue stuck caused by PushState's
status was not being removed.
Caused DataPushQueue to keep waiting for taking task.
Although have resolved some bugs, here we'd better add a max wait time for
taking tasks since we already have the `PUSH_DATA_TIMEOUT` check method. If the
target worker is really stuck, we can retry our task finally.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1552 from AngersZhuuuu/CELEBORN-640.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
(cherry picked from commit 6b725202a2526b70e7eddbc9f9c0b15ee6ea6489)
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../apache/celeborn/client/write/DataPushQueue.java | 18 +++++++++++++++---
.../org/apache/celeborn/common/CelebornConf.scala | 17 +++++++++++++----
docs/configuration/client.md | 3 ++-
3 files changed, 30 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
index 244746dac..d6070d161 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +53,8 @@ public class DataPushQueue {
private final int numMappers;
private final int numPartitions;
private final ShuffleClient client;
- private final long takeTaskWaitTimeMs;
+ private final long takeTaskWaitIntervalMs;
+ private final int takeTaskMaxWaitAttempts;
public DataPushQueue(
CelebornConf conf,
@@ -73,7 +75,8 @@ public class DataPushQueue {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
this.pushState = client.getPushState(mapKey);
this.maxInFlight = conf.clientPushMaxReqsInFlight();
- this.takeTaskWaitTimeMs = conf.clientPushTakeTaskWaitTimeMs();
+ this.takeTaskWaitIntervalMs = conf.clientPushTakeTaskWaitIntervalMs();
+ this.takeTaskMaxWaitAttempts = conf.clientPushTakeTaskMaxWaitAttempts();
final int capacity = conf.clientPushQueueCapacity();
workingQueue = new LinkedBlockingQueue<>(capacity);
}
@@ -85,6 +88,7 @@ public class DataPushQueue {
public ArrayList<PushTask> takePushTasks() throws IOException,
InterruptedException {
ArrayList<PushTask> tasks = new ArrayList<>();
HashMap<String, Integer> workerCapacity = new HashMap<>();
+ HashMap<String, AtomicInteger> workerWaitAttempts = new HashMap<>();
while (dataPusher.stillRunning()) {
// clear() here is necessary since inflight pushes might change after
sleeping
// takeTaskWaitTimeMs
@@ -106,10 +110,17 @@ public class DataPushQueue {
oldCapacity = maxInFlight -
pushState.inflightPushes(loc.hostAndPushPort());
workerCapacity.put(loc.hostAndPushPort(), oldCapacity);
}
+ workerWaitAttempts.putIfAbsent(loc.hostAndPushPort(), new
AtomicInteger(0));
if (oldCapacity > 0) {
iterator.remove();
tasks.add(task);
workerCapacity.put(loc.hostAndPushPort(), oldCapacity - 1);
+ } else if (workerWaitAttempts.get(loc.hostAndPushPort()).get()
+ >= takeTaskMaxWaitAttempts) {
+ iterator.remove();
+ tasks.add(task);
+ // For such worker under high pressure, we only take one task
each turn.
+ workerWaitAttempts.get(loc.hostAndPushPort()).set(0);
}
} else {
iterator.remove();
@@ -125,7 +136,8 @@ public class DataPushQueue {
}
try {
// Reaching here means no available tasks can be pushed to any worker,
wait for a while
- Thread.sleep(takeTaskWaitTimeMs);
+ Thread.sleep(takeTaskWaitIntervalMs);
+ workerWaitAttempts.values().forEach(AtomicInteger::incrementAndGet);
} catch (InterruptedException ie) {
logger.info("Thread interrupted while waiting push task.");
throw ie;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 2128a2218..a10bf4a3b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -737,7 +737,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
}
def clientPushLimitInFlightSleepDeltaMs: Long =
get(CLIENT_PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
def clientPushSplitPartitionThreads: Int =
get(CLIENT_PUSH_SPLIT_PARTITION_THREADS)
- def clientPushTakeTaskWaitTimeMs: Long = get(CLIENT_PUSH_TAKE_TASK_WAIT_TIME)
+ def clientPushTakeTaskWaitIntervalMs: Long =
get(CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL)
+ def clientPushTakeTaskMaxWaitAttempts: Int =
get(CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS)
// //////////////////////////////////////////////////////
// Client Shuffle //
@@ -2676,14 +2677,22 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(8)
- val CLIENT_PUSH_TAKE_TASK_WAIT_TIME: ConfigEntry[Long] =
- buildConf("celeborn.client.push.takeTaskWaitTime")
+ val CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL: ConfigEntry[Long] =
+ buildConf("celeborn.client.push.takeTaskWaitInterval")
.categories("client")
- .doc("Wait time if no task available to push to worker.")
+ .doc("Wait interval if no task available to push to worker.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
+ val CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS: ConfigEntry[Int] =
+ buildConf("celeborn.client.push.takeTaskMaxWaitAttempts")
+ .categories("client")
+ .doc("Max wait times if no task available to push to worker.")
+ .version("0.3.0")
+ .intConf
+ .createWithDefault(1)
+
val TEST_CLIENT_RETRY_REVIVE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.retryRevive")
.withAlternative("celeborn.test.retryRevive")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index b60fe2243..13f6b633a 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -51,7 +51,8 @@ license: |
| celeborn.client.push.sort.randomizePartitionId.enabled | false | Whether to
randomize partitionId in push sorter. If true, partitionId will be randomized
when sort data to avoid skew when push to worker | 0.3.0 |
| celeborn.client.push.splitPartition.threads | 8 | Thread number to process
shuffle split request in shuffle client. | 0.3.0 |
| celeborn.client.push.stageEnd.timeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for waiting
StageEnd. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing filesand 1 times for releasing slots request. User can customize
this value according to your setting. By default, the value is the max timeout
value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
-| celeborn.client.push.takeTaskWaitTime | 50ms | Wait time if no task
available to push to worker. | 0.3.0 |
+| celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task
available to push to worker. | 0.3.0 |
+| celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task
available to push to worker. | 0.3.0 |
| celeborn.client.registerShuffle.maxRetries | 3 | Max retry times for client
to register shuffle. | 0.3.0 |
| celeborn.client.registerShuffle.retryWait | 3s | Wait time before next retry
if register shuffle failed. | 0.3.0 |
| celeborn.client.requestCommitFiles.maxRetries | 2 | Max retry times for
requestCommitFiles RPC. | 0.3.0 |