SteNicholas opened a new pull request, #2134:
URL: https://github.com/apache/incubator-celeborn/pull/2134

   ### What changes were proposed in this pull request?
   
   `totalInflightReqs` decrements when `batchIdSet` contains the `batchId` to 
avoid duplicate caller of `removeBatch` in `InFlightRequestTracker`.
   
   ### Why are the changes needed?
   
   Caller of `InFlightRequestTracker#removeBatch` may be duplicated, which 
cause that `totalInflightReqs` could be negative. The source of truth should be 
that `totalInflightReqs` should decrement when `batchIdSet` contains the 
`batchId`. If `batchIdSet` does not contain the `batchId`, it does not need to 
decrement `totalInflightReqs`.
   
   ```
   23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0 
(TID 206)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there 
are still -1 batches in flight for hostAndPushPort [], which exceeds the 
current limit 0.
   23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0 
(TID 206)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
   23/12/05 20:05:01 [Executor task launch worker for task 17.0 in stage 10.0 
(TID 206)] ERROR Executor: Exception in task 17.0 in stage 10.0 (TID 206)
   org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout 
for task 4-17-0 while limiting zero in-flight requests
        at 
org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:598)
        at 
org.apache.celeborn.client.ShuffleClientImpl.prepareForMergeData(ShuffleClientImpl.java:1175)
        at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:455)
        at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:210)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:589)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:594)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Internal tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to