rkhachatryan commented on a change in pull request #18391:
URL: https://github.com/apache/flink/pull/18391#discussion_r793549612



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -66,16 +71,36 @@ public ChangelogStateBackendHandleImpl(
 
         @Override
         public void registerSharedStates(SharedStateRegistry stateRegistry, 
long checkpointID) {
+            for (KeyedStateHandle keyedStateHandle : materialized) {
+                registerState(stateRegistry, checkpointID, MATERIALIZED_FLAG, 
keyedStateHandle);
+            }
+            for (ChangelogStateHandle stateHandle : nonMaterialized) {
+                registerState(stateRegistry, checkpointID, 
NON_MATERIALIZED_FLAG, stateHandle);
+            }
             stateRegistry.registerAll(materialized, checkpointID);
             stateRegistry.registerAll(nonMaterialized, checkpointID);
         }
 
+        private void registerState(
+                SharedStateRegistry stateRegistry,
+                long checkpointID,
+                String prefix,
+                KeyedStateHandle keyedStateHandle) {
+            stateRegistry.registerReference(
+                    new SharedStateRegistryKey(
+                            prefix,
+                            // here use hash code as registry key identifier.
+                            new 
StateHandleID(String.valueOf(keyedStateHandle.hashCode()))),
+                    new StreamStateHandleWrapper(keyedStateHandle),
+                    checkpointID);

Review comment:
       Thanks for updating the PR. Unfortunately, I have some further conerns 
regarding `SharedStateRegistryKey`.
   
   The updated version uses `backendIdentifier + "-" + materializationID` as a 
key.
   
   How would it handle the re-scaling case?
   Per my understanding, `backendIdentifier` will be different than what was 
used on JM while reading theckpoints.
   Additionally, on down-scaling, `materializationID` can change for some 
handles.
   
   If that's a valid concern, we probably have to use optiton (3) random IDs 
per materalized handle, and store/pass them inside 
`ChangelogStateBackendHandleImpl ` maybe as a map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to