jubins opened a new pull request, #28472: URL: https://github.com/apache/flink/pull/28472
## What is the purpose of the change Fixes [FLINK-39949](https://issues.apache.org/jira/browse/FLINK-39949) — The ForSt state backend's `getForStRestoreOperation()` method throws `UnsupportedOperationException("Not support restoring yet for ForStStateBackend")` when restoring from a canonical savepoint with `PriorityQueueStateType.ForStDB` (native timers), making it impossible to restore jobs that use the ForSt backend with native priority queues from full/canonical savepoints. The `getForStRestoreOperation()` method in `ForStKeyedStateBackendBuilder` handles three restore paths: incremental handles, full handles with heap timers, and full handles with native ForSt timers. The third branch was left as an unimplemented stub. This PR implements it by introducing `ForStFullRestoreOperation`, which uses the existing `FullSnapshotRestoreOperation` + `ForStDBWriteBatchWrapper` pattern already established by `ForStHeapTimersFullRestoreOperation`, mirroring the equivalent `RocksDBFullRestoreOperation` in the RocksDB state backend. ## Brief change log - Added `ForStFullRestoreOperation` — a new restore operation class that reads key-group data from a full/canonical savepoint and replays it into ForSt column families via `ForStDBWriteBatchWrapper` - Wired `ForStFullRestoreOperation` into `ForStKeyedStateBackendBuilder.getForStRestoreOperation()` as the fallback branch (full handle + `PriorityQueueStateType.ForStDB`), replacing the previous `UnsupportedOperationException` - Added unit tests in `ForStFullRestoreOperationTest` covering: successful restore of state values from a full snapshot, and correct handling of multiple key groups ## Verifying this change This change is covered by new unit tests in `ForStFullRestoreOperationTest`: - `testRestoreValueStateFromFullSnapshot()` — Verifies that state written to a ForSt backend and snapshotted as a canonical savepoint (`KeyGroupsStateHandle`) can be fully restored into a new backend instance with correct values - `testRestoreAcrossMultipleKeyGroups()` — Validates that key-group data spanning multiple groups is correctly distributed and readable after restore The implementation follows the same pattern established by `ForStHeapTimersFullRestoreOperation` (for heap timers) and `RocksDBFullRestoreOperation` (the RocksDB equivalent), ensuring consistency across the codebase. ## Does this pull request potentially affect one of the following parts - **Dependencies** (does it add or upgrade a dependency): no - **The public API**, i.e., is any changed class annotated with `@Public(Evolving)`: no - **The serializers**: no - **The runtime per-record code paths** (performance sensitive): no - **Anything that affects deployment or recovery** (JobManager, Checkpointing, Kubernetes/Yarn, ZooKeeper): yes — this fixes restore from canonical savepoints for the ForSt state backend - **The S3 file system connector**: no ## Documentation - Does this pull request introduce a new feature? no — it implements previously unimplemented functionality - If yes, how is the feature documented? not applicable ## Was generative AI tooling used to co-author this PR? - [x] Yes — Claude Code was used as a pair-programming assistant. All code was written, understood, and verified by the author. Generated-by: Claude Opus 4.8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
