nateab opened a new pull request, #27599:
URL: https://github.com/apache/flink/pull/27599

   ## Summary
   
   - Fix race condition where `FsStateChangelogWriter.persist()` captures 
handle references in a checkpoint snapshot but does not pin them in 
`TaskChangelogRegistry`, allowing concurrent materialization/truncation to 
discard files still referenced by in-flight checkpoints 
(`FileNotFoundException` during restore)
   - Add `retain()` method to `TaskChangelogRegistry` to pin handles at 
snapshot-build time, with corresponding unpin at `confirm()` (JM takes 
ownership) or `reset()` (checkpoint aborted)
   - Revert band-aid materialization interval increase (200ms back to 50ms) in 
`ChangelogRecoveryITCase`
   
   ## Details
   
   The root cause is that `buildSnapshotResult()` captures `StreamStateHandle` 
references into the checkpoint snapshot, but these handles are not "pinned" in 
the registry. When materialization triggers `truncate()` → `release()`, the 
refCount can reach 0 and the file is deleted — even though an in-flight 
checkpoint's snapshot still needs it.
   
   **Fix approach:** Pin handles at `buildSnapshotResult()` time via a new 
`retain()` method on `TaskChangelogRegistry` (atomically increments refCount). 
Handles are deduplicated per checkpoint using `Set<PhysicalStateHandleID>`. 
Unpin at:
   - `confirm()` — JM takes ownership, uses `stopTracking` to remove from 
registry
   - `reset()` — checkpoint aborted, uses `release` to decrement refCount 
(handle discarded if no other refs)
   - `close()` — writer shutdown, releases any remaining pinned handles
   
   ## Test plan
   
   - [x] `TaskChangelogRegistryImplTest` — 3 new tests for `retain()`: refCount 
increment, premature discard prevention, stopTracking interaction (5/5 pass)
   - [x] `FsStateChangelogWriterTest` — 2 new integration tests: 
persist→truncate→confirm (handle survives), persist→truncate→reset (handle 
discarded) (18/18 pass)
   - [x] `ChangelogStateDiscardTest` — existing discard tests still pass (6/6 
pass)
   - [x] `ChangelogRecoveryITCase#testMaterialization` — passes with reverted 
50ms materialization interval (3/3 pass)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to