take-it-out commented on code in PR #20484:
URL: https://github.com/apache/flink/pull/20484#discussion_r939970237


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java:
##########
@@ -149,29 +151,58 @@ public static ChangelogStateBackendHandle 
getChangelogStateBackendHandle(
         private static KeyedStateHandle castToAbsolutePath(
                 KeyedStateHandle originKeyedStateHandle) {
             // For KeyedStateHandle, only KeyGroupsStateHandle and 
IncrementalKeyedStateHandle
-            // contain streamStateHandle, and the checkpointedStateScope of
-            // IncrementalKeyedStateHandle
-            // is shared, no need to
-            // cast. So, only KeyGroupsStateHandle need to cast.
-            if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
-                    || originKeyedStateHandle instanceof 
KeyGroupsSavepointStateHandle) {
+            // contain streamStateHandle, and both of them need to be cast
+            // as they all have state handles of private checkpoint scope.
+            if (originKeyedStateHandle instanceof 
KeyGroupsSavepointStateHandle) {
                 return originKeyedStateHandle;
-            } else {
+            }
+            if (originKeyedStateHandle instanceof KeyGroupsStateHandle) {
                 StreamStateHandle streamStateHandle =
                         ((KeyGroupsStateHandle) 
originKeyedStateHandle).getDelegateStateHandle();
 
                 if (streamStateHandle instanceof FileStateHandle) {
-                    FileStateHandle fileStateHandle =
-                            new FileStateHandle(
-                                    ((FileStateHandle) 
streamStateHandle).getFilePath(),
-                                    streamStateHandle.getStateSize());
+                    StreamStateHandle fileStateHandle = 
restoreFileStateHandle(streamStateHandle);
                     return KeyGroupsStateHandle.restore(
                             ((KeyGroupsStateHandle) 
originKeyedStateHandle).getGroupRangeOffsets(),
                             fileStateHandle,
                             originKeyedStateHandle.getStateHandleId());
                 }
-                return originKeyedStateHandle;
             }
+            if (originKeyedStateHandle instanceof 
IncrementalRemoteKeyedStateHandle) {
+                IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle =
+                        (IncrementalRemoteKeyedStateHandle) 
originKeyedStateHandle;
+
+                StreamStateHandle castMetaStateHandle =
+                        restoreFileStateHandle(
+                                
incrementalRemoteKeyedStateHandle.getMetaStateHandle());
+                Map<StateHandleID, StreamStateHandle> castPrivateStates =
+                        
incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                Map.Entry::getKey,
+                                                e -> 
restoreFileStateHandle(e.getValue())));
+
+                return IncrementalRemoteKeyedStateHandle.restore(
+                        
incrementalRemoteKeyedStateHandle.getBackendIdentifier(),
+                        incrementalRemoteKeyedStateHandle.getKeyGroupRange(),
+                        incrementalRemoteKeyedStateHandle.getCheckpointId(),
+                        
incrementalRemoteKeyedStateHandle.getSharedStateHandles(),
+                        castPrivateStates,
+                        castMetaStateHandle,

Review Comment:
   yes, you are right, shared state should be cast too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to