This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new b1d3de565 [CELEBORN-1036][FOLLOWUP] totalInflightReqs should decrement 
when batchIdSet contains the batchId to avoid duplicate caller of removeBatch
b1d3de565 is described below

commit b1d3de56517cca3591faacda06591461f5e73e2e
Author: SteNicholas <[email protected]>
AuthorDate: Wed Dec 20 18:11:07 2023 +0800

    [CELEBORN-1036][FOLLOWUP] totalInflightReqs should decrement when 
batchIdSet contains the batchId to avoid duplicate caller of removeBatch
    
    ### What changes were proposed in this pull request?
    
    `totalInflightReqs` decrements when `batchIdSet` contains the `batchId` to 
avoid duplicate caller of `removeBatch` in `InFlightRequestTracker`.
    
    ### Why are the changes needed?
    
    Caller of `InFlightRequestTracker#removeBatch` may be duplicated, which 
cause that `totalInflightReqs` could be negative. The source of truth should be 
that `totalInflightReqs` should decrement when `batchIdSet` contains the 
`batchId`. If `batchIdSet` does not contain the `batchId`, it does not need to 
decrement `totalInflightReqs`.
    
    ```
    23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0 
(TID 206)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there 
are still -1 batches in flight for hostAndPushPort [], which exceeds the 
current limit 0.
    23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0 
(TID 206)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
    23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0 
(TID 206)] ERROR Executor: Exception in task 17.0 in stage 10.0 (TID 206)
    org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout 
for task 4-17-0 while limiting zero in-flight requests
            at 
org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:598)
            at 
org.apache.celeborn.client.ShuffleClientImpl.prepareForMergeData(ShuffleClientImpl.java:1175)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:455)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:210)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:141)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:589)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:594)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests.
    
    Closes #2134 from SteNicholas/CELEBORN-1036.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 089a0f868600f4ea280c63cc0960586bf9389b55)
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/common/write/InFlightRequestTracker.java  | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
 
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
index 5b68f49ba..e5514922c 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
@@ -58,22 +58,28 @@ public class InFlightRequestTracker {
   }
 
   public void addBatch(int batchId, String hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
+    Set<Integer> batchIdSet =
         inflightBatchesPerAddress.computeIfAbsent(
             hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
+    if (batchIdSet.add(batchId)) {
+      totalInflightReqs.increment();
+    } else {
+      logger.debug("{} has already been inflight.", batchId);
+    }
   }
 
   public void removeBatch(int batchId, String hostAndPushPort) {
     Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
     // TODO: Need to debug why batchIdSet will be null.
     if (batchIdSet != null) {
-      batchIdSet.remove(batchId);
+      if (batchIdSet.remove(batchId)) {
+        totalInflightReqs.decrement();
+      } else {
+        logger.debug("BatchIdSet has removed {}.", batchId);
+      }
     } else {
       logger.warn("BatchIdSet of {} is null.", hostAndPushPort);
     }
-    totalInflightReqs.decrement();
   }
 
   public void onSuccess(String hostAndPushPort) {
@@ -97,7 +103,7 @@ public class InFlightRequestTracker {
     pushStrategy.limitPushSpeed(pushState, hostAndPushPort);
     int currentMaxReqsInFlight = 
pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort);
 
-    Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
+    Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
     long times = waitInflightTimeoutMs / delta;
     try {
       while (times > 0) {

Reply via email to