1996fanrui opened a new pull request, #28605: URL: https://github.com/apache/flink/pull/28605
## What is the purpose of the change `LocalInputChannel.getBufferAndAvailability()` was calling `channelStatePersister.maybePersist(buffer)` unconditionally for all buffers, including recovered buffers that were already spilled to channel state by `checkpointStarted()`. This caused recovered buffers to be persisted twice during a checkpoint. ## Brief change log - Add `fromUpstream` parameter to `getBufferAndAvailability()` to distinguish buffers pulled from the upstream `ResultSubpartitionView` vs. recovered buffers from `toBeConsumedBuffers`. - Only call `maybePersist()` when `fromUpstream == true`, since recovered buffers are already fully spilled by `checkpointStarted()`. - Add unit test `testRecoveredBuffersNotPersistedAgainWhenConsumedDuringCheckpoint` that verifies recovered buffers are not re-added to the channel state writer when consumed during a checkpoint. ## Verifying this change - Added `testRecoveredBuffersNotPersistedAgainWhenConsumedDuringCheckpoint` in `LocalInputChannelTest`: starts a checkpoint, verifies 2 recovered buffers are persisted by `checkpointStarted`, consumes the recovered buffers, then asserts the total persisted count is still 2 (not 4). ## Does this pull request potentially affect one of the following areas - **Network stack / checkpointing**: yes — `LocalInputChannel` channel-state persistence path. 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
