SteNicholas opened a new pull request, #2725: URL: https://github.com/apache/celeborn/pull/2725
### What changes were proposed in this pull request? `InFlightRequestTracker` should decrement `totalInflightReqs` when `inflightBatchesPerAddress` isn't empty to avoid negative `totalInflightReqs` for `limitZeroInFlight`. Follow up #2621. ### Why are the changes needed? After #2621, there are `Waiting timeout for task %s while limiting zero in-flight requests` exceptions in production environment which is not definitely reproduced. The log shows that this exception occurs when cleanup is first called at mapper end, and then removeBatch is called, which is the reason why `batchIdSet` is null. Therefore, `InFlightRequestTracker` could decrement `totalInflightReqs` when `inflightBatchesPerAddress` isn't empty. <img width="1439" alt="image" src="https://github.com/user-attachments/assets/07c14561-42c9-4e65-8536-ffcc79b2b697"> ``` 24/09/05 12:37:35 [Executor task launch worker for task 0.0 in stage 3.1 (TID 13935)] WARN InFlightRequestTracker: Clear InFlightRequestTracker 24/09/05 12:37:35 [data-client-5-5] WARN InFlightRequestTracker: BatchIdSet of 172.27.32.15:9092 is null. 24/09/05 12:57:38 [Executor task launch worker for task 0.0 in stage 3.1 (TID 13935)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0. 24/09/05 12:57:38 [Executor task launch worker for task 0.0 in stage 3.1 (TID 13935)] ERROR Executor: Exception in task 0.0 in stage 3.1 (TID 13935) org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 3-5290-0 while limiting zero in-flight requests at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676) at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555) at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
