SteNicholas opened a new pull request, #2134:
URL: https://github.com/apache/incubator-celeborn/pull/2134
### What changes were proposed in this pull request?
`totalInflightReqs` decrements when `batchIdSet` contains the `batchId` to
avoid duplicate caller of `removeBatch` in `InFlightRequestTracker`.
### Why are the changes needed?
Caller of `InFlightRequestTracker#removeBatch` may be duplicated, which
cause that `totalInflightReqs` could be negative. The source of truth should be
that `totalInflightReqs` should decrement when `batchIdSet` contains the
`batchId`. If `batchIdSet` does not contain the `batchId`, it does not need to
decrement `totalInflightReqs`.
```
23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0
(TID 206)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there
are still -1 batches in flight for hostAndPushPort [], which exceeds the
current limit 0.
23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0
(TID 206)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0
(TID 206)] ERROR Executor: Exception in task 17.0 in stage 10.0 (TID 206)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout
for task 4-17-0 while limiting zero in-flight requests
at
org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:598)
at
org.apache.celeborn.client.ShuffleClientImpl.prepareForMergeData(ShuffleClientImpl.java:1175)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:455)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:210)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:589)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:594)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]