curcur commented on code in PR #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r844713653
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -207,11 +208,13 @@ public long getStateSize() {
* checkpoint is added into the store.
*
* @param sharedStateRegistry The registry where shared states are
registered
+ * @param restoreMode the mode in which this checkpoint was restored from
*/
- public void registerSharedStatesAfterRestored(SharedStateRegistry
sharedStateRegistry) {
+ public void registerSharedStatesAfterRestored(
+ SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
// in claim mode we should not register any shared handles
if (!props.isUnclaimed()) {
Review Comment:
Does it register/expect to register any SHARED states with **previous jobs**
in NO_CLAIM Mode?
This may be related to my question in
`SharedStateRegistry#registerAllAfterRestored`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -174,6 +181,20 @@ public void registerAll(
}
}
+ @Override
+ public void registerAllAfterRestored(CompletedCheckpoint checkpoint,
RestoreMode mode) {
+ registerAll(checkpoint.getOperatorStates().values(),
checkpoint.getCheckpointID());
+ // In NO_CLAIM and LEGACY restore modes, shared state of the initial
checkpoints must be
+ // preserved. This is achieved by advancing highestRetainCheckpointID
here, and then
+ // checking entry.createdByCheckpointID against it on checkpoint
subsumption.
+ // In CLAIM restore mode, the shared state of the initial checkpoints
must be
+ // discarded as soon as it becomes unused - so
highestRetainCheckpointID is not updated.
+ if (mode != RestoreMode.CLAIM) {
Review Comment:
I think only LEGACY mode need to keep artifacts from the previous checkpoint
here?
1. CLAIM mode, as explained above, the initial checkpoint already belongs to
the current job, hence has to be included within the lifecycle as well.
2. In No-CLAIM mode, the previous checkpoint is copied or a full copy of the
checkpoint is made as to the initial checkpoint.
These are the whole point why CLAIM and NO CLAIM modes are introduced and
make a clear boundary of checkpoint ownership between different jobs.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java:
##########
@@ -53,4 +53,6 @@
public InlineElement getDescription() {
return text(description);
}
+
+ public static final RestoreMode DEFAULT = NO_CLAIM;
Review Comment:
Is there any place we can get the default config directly instead of
hard-coding here?
It will easily miss this if later we want to make changes for default.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]