fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r853004206
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -211,13 +220,87 @@ public long getStateSize() {
* @param restoreMode the mode in which this checkpoint was restored from
*/
public void registerSharedStatesAfterRestored(
- SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+ SharedStateRegistry sharedStateRegistry,
+ RestoreMode restoreMode,
+ boolean changelogEnabled) {
// in claim mode we should not register any shared handles
if (!props.isUnclaimed()) {
+ if (changelogEnabled) {
+ for (OperatorState operatorState : operatorStates.values()) {
+ for (Map.Entry<Integer, OperatorSubtaskState> entry :
+ operatorState.getSubtaskStates().entrySet()) {
+ List<KeyedStateHandle> changelogStateBackendHandles =
+
entry.getValue().getManagedKeyedState().stream()
+ .map(x ->
getChangelogStateBackendHandle(x))
+ .collect(Collectors.toList());
+ StateObjectCollection<KeyedStateHandle> stateHandles =
+ new
StateObjectCollection<>(changelogStateBackendHandles);
+ operatorState.putState(
+ entry.getKey(),
+ entry.getValue()
+ .toBuilder()
+ .setManagedKeyedState(stateHandles)
+ .build());
Review Comment:
Thanks for your suggestion!
Keeping `CompletedCheckpoint` immutable is a better approach, so I moved the
code of rebuilding checkpoint to `Checkpoints.loadAndValidateCheckpoint`, and
put the cast logic in
`ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl`.
> I think registering all KeyedStateHandles with the SharedStateRegistry on
recovery in CLAIM mode would also solve the problem, wouldn't it?
For this suggestion, I think it may not work as well, because the
`discardState()` of some KeyedStateHandles are **not empty**, the state would
be discarded on checkpoint subsuming.
and I also left a comment under
[FLINK-25872](https://issues.apache.org/jira/browse/FLINK-25872), maybe we can
discuss in the ticket.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]