Myasuka commented on code in PR #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r844022390


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -66,10 +67,25 @@ StreamStateHandle registerReference(
     /**
      * Register given shared states in the registry.
      *
+     * <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after 
recovery), please use
+     * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)}
+     *
      * @param stateHandles The shared states to register.
      * @param checkpointID which uses the states.
      */
     void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, 
long checkpointID);
 
+    /**
+     * Set the lowest checkpoint ID below which no state is discarded, 
inclusive.
+     *
+     * <p>After recovery from an incremental checkpoint, its state should NOT 
be discarded, even if
+     * {@link #unregisterUnusedState(long) not used} anymore (unless 
recovering in {@link
+     * RestoreMode#CLAIM CLAIM} mode).
+     *
+     * <p>This should hold for both cases: when recovering from that initial 
checkpoint; and from
+     * any subsequent checkpoint derived from it.
+     */
+    void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode 
mode);

Review Comment:
   Since `CheckpointCoordinator.restoreSavepoint` would use 
checkpoint/savepoint via `SavepointRestoreSettings#getRestorePath`, and it will 
not change even JM crashed, and I think we might use this information to detect 
whether need to update the `highestNotClaimedCheckpointID`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to