curcur commented on code in PR #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r844713653


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -207,11 +208,13 @@ public long getStateSize() {
      * checkpoint is added into the store.
      *
      * @param sharedStateRegistry The registry where shared states are 
registered
+     * @param restoreMode the mode in which this checkpoint was restored from
      */
-    public void registerSharedStatesAfterRestored(SharedStateRegistry 
sharedStateRegistry) {
+    public void registerSharedStatesAfterRestored(
+            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {

Review Comment:
   Does it register/expect to register any SHARED states with **previous jobs** 
in NO_CLAIM Mode?
   
   This may be related to my question in 
`SharedStateRegistry#registerAllAfterRestored`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -174,6 +181,20 @@ public void registerAll(
         }
     }
 
+    @Override
+    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, 
RestoreMode mode) {
+        registerAll(checkpoint.getOperatorStates().values(), 
checkpoint.getCheckpointID());
+        // In NO_CLAIM and LEGACY restore modes, shared state of the initial 
checkpoints must be
+        // preserved. This is achieved by advancing highestRetainCheckpointID 
here, and then
+        // checking entry.createdByCheckpointID against it on checkpoint 
subsumption.
+        // In CLAIM restore mode, the shared state of the initial checkpoints 
must be
+        // discarded as soon as it becomes unused - so 
highestRetainCheckpointID is not updated.
+        if (mode != RestoreMode.CLAIM) {

Review Comment:
   I think only LEGACY mode need to keep artifacts from the previous checkpoint 
here?
   
   1. CLAIM mode, as explained above, the initial checkpoint already belongs to 
the current job, hence has to be included within the lifecycle as well.
   2. In No-CLAIM mode, the previous checkpoint is copied or a full copy of the 
checkpoint is made as to the initial checkpoint. 
   
   These are the whole point why CLAIM and NO CLAIM modes are introduced and 
make a clear boundary of checkpoint ownership between different jobs.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java:
##########
@@ -53,4 +53,6 @@
     public InlineElement getDescription() {
         return text(description);
     }
+
+    public static final RestoreMode DEFAULT = NO_CLAIM;

Review Comment:
   Is there any place we can get the default config directly instead of 
hard-coding here?
   
   It will easily miss this if later we want to make changes for default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to