sl3635 opened a new pull request, #3616: URL: https://github.com/apache/celeborn/pull/3616
### What changes were proposed in this pull request? Fix a bug in `MemoryManager.switchServingState()` where replicate channels permanently lose `autoRead=true` after a memory pressure event. When the serving state transitions from `PUSH_AND_REPLICATE_PAUSED` to `PUSH_PAUSED`, `resumeReplicate()` was only called inside the `!tryResumeByPinnedMemory()` guard. If `tryResumeByPinnedMemory()` returned `true`, the entire block was skipped and replicate channels were never resumed. The fix moves `resumeReplicate()` outside the `tryResumeByPinnedMemory()` guard so it is always called when stepping down from `PUSH_AND_REPLICATE_PAUSED` to `PUSH_PAUSED`. This is a state machine invariant: `PUSH_PAUSED` means only push is paused; replicate must always be resumed. ### Why are the changes needed? Once replicate channels are stuck with `autoRead=false`, Netty I/O threads stop reading from all replicate connections. Remote workers writing to the affected worker see their TCP send buffers fill up (zero window), causing pending writes to accumulate in `ChannelOutboundBuffer`. Each pending write holds a reference to a direct memory `ByteBuf`, causing direct memory to grow indefinitely on the remote workers. The failure sequence: 1. Worker hits memory pressure → state = `PUSH_AND_REPLICATE_PAUSED` → all channels paused 2. Pinned memory is low → `tryResumeByPinnedMemory()` returns `true` → `resumeByPinnedMemory(PUSH_PAUSED)` resumes push only, replicate not resumed 3. Memory drops to push-only range → state = `PUSH_PAUSED`, but `resumeReplicate()` is never called 4. Replicate channels permanently stuck with `autoRead=false`, causing unbounded direct memory growth on remote workers ### Does this PR resolve a correctness bug? Yes. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a new unit test `Test MemoryManager resume replicate by pinned memory` in `MemoryManagerSuite` that reproduces the exact failure scenario: 1. Enter `PUSH_AND_REPLICATE_PAUSED` with low pinned memory (channels resumed by pinned memory path) 2. Raise pinned memory so both push and replicate get paused 3. Drop memory to `PUSH_PAUSED` range with low pinned memory 4. Assert replicate listener is resumed — this assertion fails without the fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
