This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 9462e479c [CELEBORN-640][WORKER] DataPushQueue should not keep waiting 
take tasks
9462e479c is described below

commit 9462e479c21a3e013257d08c2ffc14d68a26e07c
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 9 14:06:47 2023 +0800

    [CELEBORN-640][WORKER] DataPushQueue should not keep waiting take tasks
    
    ### What changes were proposed in this pull request?
    In our prod meet many times of push queue stuck caused by PushState's 
status was not being removed.
    Caused DataPushQueue to keep waiting for taking task.
    
    Although have resolved some bugs, here we'd better add a max wait time for 
taking tasks since we already have the `PUSH_DATA_TIMEOUT` check method. If the 
target worker is really stuck, we can retry our task finally.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1552 from AngersZhuuuu/CELEBORN-640.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
    (cherry picked from commit 6b725202a2526b70e7eddbc9f9c0b15ee6ea6489)
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../apache/celeborn/client/write/DataPushQueue.java    | 18 +++++++++++++++---
 .../org/apache/celeborn/common/CelebornConf.scala      | 17 +++++++++++++----
 docs/configuration/client.md                           |  3 ++-
 3 files changed, 30 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java 
b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
index 244746dac..d6070d161 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPushQueue.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,8 @@ public class DataPushQueue {
   private final int numMappers;
   private final int numPartitions;
   private final ShuffleClient client;
-  private final long takeTaskWaitTimeMs;
+  private final long takeTaskWaitIntervalMs;
+  private final int takeTaskMaxWaitAttempts;
 
   public DataPushQueue(
       CelebornConf conf,
@@ -73,7 +75,8 @@ public class DataPushQueue {
     final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
     this.pushState = client.getPushState(mapKey);
     this.maxInFlight = conf.clientPushMaxReqsInFlight();
-    this.takeTaskWaitTimeMs = conf.clientPushTakeTaskWaitTimeMs();
+    this.takeTaskWaitIntervalMs = conf.clientPushTakeTaskWaitIntervalMs();
+    this.takeTaskMaxWaitAttempts = conf.clientPushTakeTaskMaxWaitAttempts();
     final int capacity = conf.clientPushQueueCapacity();
     workingQueue = new LinkedBlockingQueue<>(capacity);
   }
@@ -85,6 +88,7 @@ public class DataPushQueue {
   public ArrayList<PushTask> takePushTasks() throws IOException, 
InterruptedException {
     ArrayList<PushTask> tasks = new ArrayList<>();
     HashMap<String, Integer> workerCapacity = new HashMap<>();
+    HashMap<String, AtomicInteger> workerWaitAttempts = new HashMap<>();
     while (dataPusher.stillRunning()) {
       // clear() here is necessary since inflight pushes might change after 
sleeping
       // takeTaskWaitTimeMs
@@ -106,10 +110,17 @@ public class DataPushQueue {
               oldCapacity = maxInFlight - 
pushState.inflightPushes(loc.hostAndPushPort());
               workerCapacity.put(loc.hostAndPushPort(), oldCapacity);
             }
+            workerWaitAttempts.putIfAbsent(loc.hostAndPushPort(), new 
AtomicInteger(0));
             if (oldCapacity > 0) {
               iterator.remove();
               tasks.add(task);
               workerCapacity.put(loc.hostAndPushPort(), oldCapacity - 1);
+            } else if (workerWaitAttempts.get(loc.hostAndPushPort()).get()
+                >= takeTaskMaxWaitAttempts) {
+              iterator.remove();
+              tasks.add(task);
+              // For such worker under high pressure, we only take one task 
each turn.
+              workerWaitAttempts.get(loc.hostAndPushPort()).set(0);
             }
           } else {
             iterator.remove();
@@ -125,7 +136,8 @@ public class DataPushQueue {
       }
       try {
         // Reaching here means no available tasks can be pushed to any worker, 
wait for a while
-        Thread.sleep(takeTaskWaitTimeMs);
+        Thread.sleep(takeTaskWaitIntervalMs);
+        workerWaitAttempts.values().forEach(AtomicInteger::incrementAndGet);
       } catch (InterruptedException ie) {
         logger.info("Thread interrupted while waiting push task.");
         throw ie;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 2128a2218..a10bf4a3b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -737,7 +737,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     }
   def clientPushLimitInFlightSleepDeltaMs: Long = 
get(CLIENT_PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
   def clientPushSplitPartitionThreads: Int = 
get(CLIENT_PUSH_SPLIT_PARTITION_THREADS)
-  def clientPushTakeTaskWaitTimeMs: Long = get(CLIENT_PUSH_TAKE_TASK_WAIT_TIME)
+  def clientPushTakeTaskWaitIntervalMs: Long = 
get(CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL)
+  def clientPushTakeTaskMaxWaitAttempts: Int = 
get(CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS)
 
   // //////////////////////////////////////////////////////
   //                   Client Shuffle                    //
@@ -2676,14 +2677,22 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(8)
 
-  val CLIENT_PUSH_TAKE_TASK_WAIT_TIME: ConfigEntry[Long] =
-    buildConf("celeborn.client.push.takeTaskWaitTime")
+  val CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL: ConfigEntry[Long] =
+    buildConf("celeborn.client.push.takeTaskWaitInterval")
       .categories("client")
-      .doc("Wait time if no task available to push to worker.")
+      .doc("Wait interval if no task available to push to worker.")
       .version("0.3.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("50ms")
 
+  val CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS: ConfigEntry[Int] =
+    buildConf("celeborn.client.push.takeTaskMaxWaitAttempts")
+      .categories("client")
+      .doc("Max wait times if no task available to push to worker.")
+      .version("0.3.0")
+      .intConf
+      .createWithDefault(1)
+
   val TEST_CLIENT_RETRY_REVIVE: ConfigEntry[Boolean] =
     buildConf("celeborn.test.client.retryRevive")
       .withAlternative("celeborn.test.retryRevive")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index b60fe2243..13f6b633a 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -51,7 +51,8 @@ license: |
 | celeborn.client.push.sort.randomizePartitionId.enabled | false | Whether to 
randomize partitionId in push sorter. If true, partitionId will be randomized 
when sort data to avoid skew when push to worker | 0.3.0 | 
 | celeborn.client.push.splitPartition.threads | 8 | Thread number to process 
shuffle split request in shuffle client. | 0.3.0 | 
 | celeborn.client.push.stageEnd.timeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for waiting 
StageEnd. During this process, there are 
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities 
for committing filesand 1 times for releasing slots request. User can customize 
this value according to your setting. By default, the value is the max timeout 
value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 | 
-| celeborn.client.push.takeTaskWaitTime | 50ms | Wait time if no task 
available to push to worker. | 0.3.0 | 
+| celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task 
available to push to worker. | 0.3.0 | 
+| celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task 
available to push to worker. | 0.3.0 | 
 | celeborn.client.registerShuffle.maxRetries | 3 | Max retry times for client 
to register shuffle. | 0.3.0 | 
 | celeborn.client.registerShuffle.retryWait | 3s | Wait time before next retry 
if register shuffle failed. | 0.3.0 | 
 | celeborn.client.requestCommitFiles.maxRetries | 2 | Max retry times for 
requestCommitFiles RPC. | 0.3.0 | 

Reply via email to