1996fanrui opened a new pull request, #28605:
URL: https://github.com/apache/flink/pull/28605

   ## What is the purpose of the change
   
   `LocalInputChannel.getBufferAndAvailability()` was calling 
`channelStatePersister.maybePersist(buffer)` unconditionally for all buffers, 
including recovered buffers that were already spilled to channel state by 
`checkpointStarted()`. This caused recovered buffers to be persisted twice 
during a checkpoint.
   
   ## Brief change log
   
   - Add `fromUpstream` parameter to `getBufferAndAvailability()` to 
distinguish buffers pulled from the upstream `ResultSubpartitionView` vs. 
recovered buffers from `toBeConsumedBuffers`.
   - Only call `maybePersist()` when `fromUpstream == true`, since recovered 
buffers are already fully spilled by `checkpointStarted()`.
   - Add unit test 
`testRecoveredBuffersNotPersistedAgainWhenConsumedDuringCheckpoint` that 
verifies recovered buffers are not re-added to the channel state writer when 
consumed during a checkpoint.
   
   ## Verifying this change
   
   - Added `testRecoveredBuffersNotPersistedAgainWhenConsumedDuringCheckpoint` 
in `LocalInputChannelTest`: starts a checkpoint, verifies 2 recovered buffers 
are persisted by `checkpointStarted`, consumes the recovered buffers, then 
asserts the total persisted count is still 2 (not 4).
   
   ## Does this pull request potentially affect one of the following areas
   
   - **Network stack / checkpointing**: yes — `LocalInputChannel` channel-state 
persistence path.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to