nateab opened a new pull request, #27599: URL: https://github.com/apache/flink/pull/27599
## Summary - Fix race condition where `FsStateChangelogWriter.persist()` captures handle references in a checkpoint snapshot but does not pin them in `TaskChangelogRegistry`, allowing concurrent materialization/truncation to discard files still referenced by in-flight checkpoints (`FileNotFoundException` during restore) - Add `retain()` method to `TaskChangelogRegistry` to pin handles at snapshot-build time, with corresponding unpin at `confirm()` (JM takes ownership) or `reset()` (checkpoint aborted) - Revert band-aid materialization interval increase (200ms back to 50ms) in `ChangelogRecoveryITCase` ## Details The root cause is that `buildSnapshotResult()` captures `StreamStateHandle` references into the checkpoint snapshot, but these handles are not "pinned" in the registry. When materialization triggers `truncate()` → `release()`, the refCount can reach 0 and the file is deleted — even though an in-flight checkpoint's snapshot still needs it. **Fix approach:** Pin handles at `buildSnapshotResult()` time via a new `retain()` method on `TaskChangelogRegistry` (atomically increments refCount). Handles are deduplicated per checkpoint using `Set<PhysicalStateHandleID>`. Unpin at: - `confirm()` — JM takes ownership, uses `stopTracking` to remove from registry - `reset()` — checkpoint aborted, uses `release` to decrement refCount (handle discarded if no other refs) - `close()` — writer shutdown, releases any remaining pinned handles ## Test plan - [x] `TaskChangelogRegistryImplTest` — 3 new tests for `retain()`: refCount increment, premature discard prevention, stopTracking interaction (5/5 pass) - [x] `FsStateChangelogWriterTest` — 2 new integration tests: persist→truncate→confirm (handle survives), persist→truncate→reset (handle discarded) (18/18 pass) - [x] `ChangelogStateDiscardTest` — existing discard tests still pass (6/6 pass) - [x] `ChangelogRecoveryITCase#testMaterialization` — passes with reverted 50ms materialization interval (3/3 pass) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
