rkhachatryan commented on code in PR #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r844900982
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -174,6 +181,20 @@ public void registerAll(
}
}
+ @Override
+ public void registerAllAfterRestored(CompletedCheckpoint checkpoint,
RestoreMode mode) {
+ registerAll(checkpoint.getOperatorStates().values(),
checkpoint.getCheckpointID());
+ // In NO_CLAIM and LEGACY restore modes, shared state of the initial
checkpoints must be
+ // preserved. This is achieved by advancing highestRetainCheckpointID
here, and then
+ // checking entry.createdByCheckpointID against it on checkpoint
subsumption.
+ // In CLAIM restore mode, the shared state of the initial checkpoints
must be
+ // discarded as soon as it becomes unused - so
highestRetainCheckpointID is not updated.
+ if (mode != RestoreMode.CLAIM) {
Review Comment:
Although the initial checkpoint is fully copied in `NO_CLAIM` mode, only the
**next** checkpoint can be "claimed".
`SharedStateRegistry ` and `CompletedCheckpointStore` still refer to the
initial checkpoint until it is subsumed.
So we must prevent it's artifacts from being deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]