rkhachatryan commented on code in PR #19331:
URL: https://github.com/apache/flink/pull/19331#discussion_r843690254
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -66,10 +67,25 @@ StreamStateHandle registerReference(
/**
* Register given shared states in the registry.
*
+ * <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after
recovery), please use
+ * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)}
+ *
* @param stateHandles The shared states to register.
* @param checkpointID which uses the states.
*/
void registerAll(Iterable<? extends CompositeStateHandle> stateHandles,
long checkpointID);
+ /**
+ * Set the lowest checkpoint ID below which no state is discarded,
inclusive.
+ *
+ * <p>After recovery from an incremental checkpoint, its state should NOT
be discarded, even if
+ * {@link #unregisterUnusedState(long) not used} anymore (unless
recovering in {@link
+ * RestoreMode#CLAIM CLAIM} mode).
+ *
+ * <p>This should hold for both cases: when recovering from that initial
checkpoint; and from
+ * any subsequent checkpoint derived from it.
+ */
+ void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode
mode);
Review Comment:
I don't think it's possible.
If the above call is removed (from `SharedStateRegistryFactory`),
and only the call from `CheckpointCoordinator` is left,
then in the case when JM fails over **before** taking any new checkpoints,
the old no-claim-state will be discarded.
Let's consider what happens step-by-step:
1. "Manual" restore from (externalized) checkpoint in `NO_CLAIM` mode
2. No checkpoints are present in ZK, so
`DefaultExecutionGraphFactory.createAndRestoreExecutionGraph` calls
`CheckpointCoordinator.restoreSavepoint`
3. `CheckpointCoordinator.restoreSavepoint` loads the checkpoitn AND adds it
to ZK
4. A failure happens before any new checkpoint is added to ZK; one JM is lost
5. A new JM takes over and loads checkpoints from ZK; it will NOT call
`CheckpointCoordinator.restoreSavepoint` this time because ZK store is not empty
So we need to make sure in step (5) that we don't discard the shared state
of checkpoints loaded from ZK, not only from FS.
Does this make sense or maybe I'm missing something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]