This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new b1d3de565 [CELEBORN-1036][FOLLOWUP] totalInflightReqs should decrement
when batchIdSet contains the batchId to avoid duplicate caller of removeBatch
b1d3de565 is described below
commit b1d3de56517cca3591faacda06591461f5e73e2e
Author: SteNicholas <[email protected]>
AuthorDate: Wed Dec 20 18:11:07 2023 +0800
[CELEBORN-1036][FOLLOWUP] totalInflightReqs should decrement when
batchIdSet contains the batchId to avoid duplicate caller of removeBatch
### What changes were proposed in this pull request?
`totalInflightReqs` decrements when `batchIdSet` contains the `batchId` to
avoid duplicate caller of `removeBatch` in `InFlightRequestTracker`.
### Why are the changes needed?
Caller of `InFlightRequestTracker#removeBatch` may be duplicated, which
cause that `totalInflightReqs` could be negative. The source of truth should be
that `totalInflightReqs` should decrement when `batchIdSet` contains the
`batchId`. If `batchIdSet` does not contain the `batchId`, it does not need to
decrement `totalInflightReqs`.
```
23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0
(TID 206)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there
are still -1 batches in flight for hostAndPushPort [], which exceeds the
current limit 0.
23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0
(TID 206)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0
(TID 206)] ERROR Executor: Exception in task 17.0 in stage 10.0 (TID 206)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout
for task 4-17-0 while limiting zero in-flight requests
at
org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:598)
at
org.apache.celeborn.client.ShuffleClientImpl.prepareForMergeData(ShuffleClientImpl.java:1175)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:455)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:210)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:589)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:594)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes #2134 from SteNicholas/CELEBORN-1036.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 089a0f868600f4ea280c63cc0960586bf9389b55)
Signed-off-by: mingji <[email protected]>
---
.../celeborn/common/write/InFlightRequestTracker.java | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
index 5b68f49ba..e5514922c 100644
---
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
+++
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
@@ -58,22 +58,28 @@ public class InFlightRequestTracker {
}
public void addBatch(int batchId, String hostAndPushPort) {
- Set<Integer> batchIdSetPerPair =
+ Set<Integer> batchIdSet =
inflightBatchesPerAddress.computeIfAbsent(
hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
- batchIdSetPerPair.add(batchId);
- totalInflightReqs.increment();
+ if (batchIdSet.add(batchId)) {
+ totalInflightReqs.increment();
+ } else {
+ logger.debug("{} has already been inflight.", batchId);
+ }
}
public void removeBatch(int batchId, String hostAndPushPort) {
Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
// TODO: Need to debug why batchIdSet will be null.
if (batchIdSet != null) {
- batchIdSet.remove(batchId);
+ if (batchIdSet.remove(batchId)) {
+ totalInflightReqs.decrement();
+ } else {
+ logger.debug("BatchIdSet has removed {}.", batchId);
+ }
} else {
logger.warn("BatchIdSet of {} is null.", hostAndPushPort);
}
- totalInflightReqs.decrement();
}
public void onSuccess(String hostAndPushPort) {
@@ -97,7 +103,7 @@ public class InFlightRequestTracker {
pushStrategy.limitPushSpeed(pushState, hostAndPushPort);
int currentMaxReqsInFlight =
pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort);
- Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
+ Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
long times = waitInflightTimeoutMs / delta;
try {
while (times > 0) {