sl3635 opened a new pull request, #3616:
URL: https://github.com/apache/celeborn/pull/3616

   ### What changes were proposed in this pull request?
   
   Fix a bug in `MemoryManager.switchServingState()` where replicate channels 
permanently lose `autoRead=true` after a memory pressure event.
   
   When the serving state transitions from `PUSH_AND_REPLICATE_PAUSED` to 
`PUSH_PAUSED`, `resumeReplicate()` was only called inside the 
`!tryResumeByPinnedMemory()` guard. If `tryResumeByPinnedMemory()` returned 
`true`, the entire block was skipped and replicate channels were never resumed.
   
   The fix moves `resumeReplicate()` outside the `tryResumeByPinnedMemory()` 
guard so it is always called when stepping down from 
`PUSH_AND_REPLICATE_PAUSED` to `PUSH_PAUSED`. This is a state machine 
invariant: `PUSH_PAUSED` means only push is paused; replicate must always be 
resumed.
   
   ### Why are the changes needed?
   
   Once replicate channels are stuck with `autoRead=false`, Netty I/O threads 
stop reading from all replicate connections. Remote workers writing to the 
affected worker see their TCP send buffers fill up (zero window), causing 
pending writes to accumulate in `ChannelOutboundBuffer`. Each pending write 
holds a reference to a direct memory `ByteBuf`, causing direct memory to grow 
indefinitely on the remote workers.
   
   The failure sequence:
   1. Worker hits memory pressure → state = `PUSH_AND_REPLICATE_PAUSED` → all 
channels paused
   2. Pinned memory is low → `tryResumeByPinnedMemory()` returns `true` → 
`resumeByPinnedMemory(PUSH_PAUSED)` resumes push only, replicate not resumed
   3. Memory drops to push-only range → state = `PUSH_PAUSED`, but 
`resumeReplicate()` is never called
   4. Replicate channels permanently stuck with `autoRead=false`, causing 
unbounded direct memory growth on remote workers
   
   ### Does this PR resolve a correctness bug?
   
   Yes.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Added a new unit test `Test MemoryManager resume replicate by pinned memory` 
in `MemoryManagerSuite` that reproduces the exact failure scenario:
   1. Enter `PUSH_AND_REPLICATE_PAUSED` with low pinned memory (channels 
resumed by pinned memory path)
   2. Raise pinned memory so both push and replicate get paused
   3. Drop memory to `PUSH_PAUSED` range with low pinned memory
   4. Assert replicate listener is resumed — this assertion fails without the 
fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to