SteNicholas opened a new pull request, #2725:
URL: https://github.com/apache/celeborn/pull/2725

   ### What changes were proposed in this pull request?
   
   `InFlightRequestTracker` should decrement `totalInflightReqs` when 
`inflightBatchesPerAddress` isn't empty to avoid negative `totalInflightReqs` 
for `limitZeroInFlight`.
   
   Follow up #2621.
   
   ### Why are the changes needed?
   
   After #2621, there are `Waiting timeout for task %s while limiting zero 
in-flight requests` exceptions in production environment which is not 
definitely reproduced. The log shows that this exception occurs when cleanup is 
first called at mapper end, and then removeBatch is called, which is the reason 
why `batchIdSet` is null. Therefore, `InFlightRequestTracker` could decrement 
`totalInflightReqs` when `inflightBatchesPerAddress` isn't empty.
   
   <img width="1439" alt="image" 
src="https://github.com/user-attachments/assets/07c14561-42c9-4e65-8536-ffcc79b2b697";>
   
   ```
   24/09/05 12:37:35 [Executor task launch worker for task 0.0 in stage 3.1 
(TID 13935)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
   24/09/05 12:37:35 [data-client-5-5] WARN InFlightRequestTracker: BatchIdSet 
of 172.27.32.15:9092 is null.
   24/09/05 12:57:38 [Executor task launch worker for task 0.0 in stage 3.1 
(TID 13935)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there 
are still [] in flight, which exceeds the current limit 0.
   24/09/05 12:57:38 [Executor task launch worker for task 0.0 in stage 3.1 
(TID 13935)] ERROR Executor: Exception in task 0.0 in stage 3.1 (TID 13935)
   org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout 
for task 3-5290-0 while limiting zero in-flight requests
        at 
org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
        at 
org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
        at 
org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
        at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
        at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:144)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to