rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r849248992


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -211,13 +220,87 @@ public long getStateSize() {
      * @param restoreMode the mode in which this checkpoint was restored from
      */
     public void registerSharedStatesAfterRestored(
-            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+            SharedStateRegistry sharedStateRegistry,
+            RestoreMode restoreMode,
+            boolean changelogEnabled) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
+            if (changelogEnabled) {
+                for (OperatorState operatorState : operatorStates.values()) {
+                    for (Map.Entry<Integer, OperatorSubtaskState> entry :
+                            operatorState.getSubtaskStates().entrySet()) {
+                        List<KeyedStateHandle> changelogStateBackendHandles =
+                                
entry.getValue().getManagedKeyedState().stream()
+                                        .map(x -> 
getChangelogStateBackendHandle(x))
+                                        .collect(Collectors.toList());
+                        StateObjectCollection<KeyedStateHandle> stateHandles =
+                                new 
StateObjectCollection<>(changelogStateBackendHandles);
+                        operatorState.putState(
+                                entry.getKey(),
+                                entry.getValue()
+                                        .toBuilder()
+                                        .setManagedKeyedState(stateHandles)
+                                        .build());

Review Comment:
   I have several concerns about placing all this logic here in 
`CompletedCheckpoint`:
   1. `CompletedCheckpoint` is intended to be immutable; here, it's contents is 
changed after potentially being used
   1. The programmer can make a mistake by registering this checkpoint with the 
registry usnig some other method (and not rebuilding handles)
   1. `CompletedCheckpoint` is made aware of Changelog and some details, like 
`ChangelogStateBackendHandleImpl`
   
   How about moving all this code of rebuilding the checkpoint closer to where 
it's loaded? I.e. `Checkpoints.loadAndValidateCheckpoint` and 
`DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoint`?
   
   Additionally for (3), I think we can not avoid **some** JM code being aware 
of changelog (unless we generalize this somehow, which is probably too early 
for now). 
   But maybe this code can be put in a single changelog-related class, e.g. 
existing `ChangelogStateBackendHandle` or some new utility class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to