rkhachatryan commented on a change in pull request #18391:
URL: https://github.com/apache/flink/pull/18391#discussion_r786875320



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -130,14 +155,51 @@ public String toString() {
                     keyGroupRange, materialized.size(), 
nonMaterialized.size());
         }
 
-        private static Closeable asCloseable(KeyedStateHandle h) {
-            return () -> {
-                try {
-                    h.discardState();
-                } catch (Exception e) {
-                    ExceptionUtils.rethrowIOException(e);
+        private static class StreamStateHandleWrapper implements 
StreamStateHandle {
+            private static final long serialVersionUID = 1L;
+
+            private final KeyedStateHandle keyedStateHandle;
+
+            StreamStateHandleWrapper(KeyedStateHandle keyedStateHandle) {
+                this.keyedStateHandle = keyedStateHandle;
+            }
+
+            @Override
+            public void discardState() throws Exception {
+                keyedStateHandle.discardState();
+            }
+
+            @Override
+            public long getStateSize() {
+                return keyedStateHandle.getStateSize();
+            }
+
+            @Override
+            public FSDataInputStream openInputStream() throws IOException {
+                throw new UnsupportedOperationException("Should not call 
here.");
+            }
+
+            @Override
+            public Optional<byte[]> asBytesIfInMemory() {
+                throw new UnsupportedOperationException("Should not call 
here.");
+            }

Review comment:
       `SharedStateRegistry.registerReference` does return `StreamStateHandle` 
- and it can be **this** wrapper. Therefore, it's not safe to leave these 
methods unimplemented.
   Or am I missing something?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
##########
@@ -98,6 +106,77 @@ public void testUnregisterWithUnexistedKey() {
         sharedStateRegistry.unregisterUnusedState(Long.MAX_VALUE);
     }
 
+    @Test
+    public void testRegisterChangelogStateBackendHandles() throws 
InterruptedException {
+        SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
+        long materializationId1 = 1L;
+        IncrementalStateHandleWrapper incrementalTMStateHandle1 =
+                createDummyIncrementalStateHandle(materializationId1);
+
+        IncrementalStateHandleWrapper incrementalJMStateHandle11 = 
incrementalTMStateHandle1.copy();

Review comment:
       NIT: To me, handle names aren't very informative and readable.
   While reviewing, I renamed them to:
   `matStateBase`
   `matState1`
   `nonMateState1`
   `backendState1`
   etc.
   So that they are shorter and give convey information.
   WDYT about such names?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -49,6 +51,9 @@
     List<ChangelogStateHandle> getNonMaterializedStateHandles();
 
     class ChangelogStateBackendHandleImpl implements 
ChangelogStateBackendHandle {
+        static final String MATERIALIZED_FLAG = "materialized";
+        static final String NON_MATERIALIZED_FLAG = "nonMaterialized";

Review comment:
       nit: `private`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -66,16 +71,36 @@ public ChangelogStateBackendHandleImpl(
 
         @Override
         public void registerSharedStates(SharedStateRegistry stateRegistry, 
long checkpointID) {
+            for (KeyedStateHandle keyedStateHandle : materialized) {
+                registerState(stateRegistry, checkpointID, MATERIALIZED_FLAG, 
keyedStateHandle);
+            }
+            for (ChangelogStateHandle stateHandle : nonMaterialized) {
+                registerState(stateRegistry, checkpointID, 
NON_MATERIALIZED_FLAG, stateHandle);
+            }
             stateRegistry.registerAll(materialized, checkpointID);
             stateRegistry.registerAll(nonMaterialized, checkpointID);
         }
 
+        private void registerState(
+                SharedStateRegistry stateRegistry,
+                long checkpointID,
+                String prefix,
+                KeyedStateHandle keyedStateHandle) {
+            stateRegistry.registerReference(
+                    new SharedStateRegistryKey(
+                            prefix,
+                            // here use hash code as registry key identifier.
+                            new 
StateHandleID(String.valueOf(keyedStateHandle.hashCode()))),
+                    new StreamStateHandleWrapper(keyedStateHandle),
+                    checkpointID);

Review comment:
       I'm concerned about the use of `hashCode` as a key in 
`SharedStateRegistry`:
   1. According to java contract, it can be the same even for different state 
handles
   2. State Backends are not required to implement this in the retuned handles
   3. RocksDB does, but explicitly discourages from its use in production code:
   `This method should only be called in tests! This should never serve as key 
in a hash map` (`IncrementalRemoteKeyedStateHandle` )
   
   I see the following alternatives:
   1. Materialization ID (I'm adding it in #18382 to 
`ChangelogStateBackendHandleImpl`)
   2. SequenceNumber upTo/from for materialized/non-materialized parts
   3. Random ID generated by backend and assigned to `ChangelogSnapshotState`.
   (In 1 and 2, we'll need to add some backend specific part)
   
   As long as the API is private, I'm not very concerned about the 
non-materialized part. Otherwise, SQN seems the best choice.
   
   WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
##########
@@ -98,6 +106,77 @@ public void testUnregisterWithUnexistedKey() {
         sharedStateRegistry.unregisterUnusedState(Long.MAX_VALUE);
     }
 
+    @Test
+    public void testRegisterChangelogStateBackendHandles() throws 
InterruptedException {
+        SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
+        long materializationId1 = 1L;
+        IncrementalStateHandleWrapper incrementalTMStateHandle1 =
+                createDummyIncrementalStateHandle(materializationId1);
+
+        IncrementalStateHandleWrapper incrementalJMStateHandle11 = 
incrementalTMStateHandle1.copy();
+        ChangelogStateHandleWrapper changelogStateHandle1 = 
createDummyChangelogStateHandle(1, 2);
+        long checkpointId1 = 41;
+        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl 
changelogStateBackendHandle1 =
+                new 
ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(
+                        Collections.singletonList(incrementalJMStateHandle11),
+                        Collections.singletonList(changelogStateHandle1),
+                        incrementalTMStateHandle1.getKeyGroupRange());
+        changelogStateBackendHandle1.registerSharedStates(sharedStateRegistry, 
checkpointId1);
+        sharedStateRegistry.checkpointCompleted(checkpointId1);
+        sharedStateRegistry.unregisterUnusedState(checkpointId1);
+
+        IncrementalStateHandleWrapper incrementalJMStateHandle12 = 
incrementalTMStateHandle1.copy();

Review comment:
       Could you please explain the purpose of calling `copy` in the comments?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
##########
@@ -143,4 +222,87 @@ public boolean isDiscarded() {
             return discarded;
         }
     }
+
+    private static class KeyedStateHandleWrapper implements KeyedStateHandle {
+        private static final long serialVersionUID = 1L;
+        protected final KeyedStateHandle keyedStateHandle;
+        private volatile boolean isDiscarded;
+
+        KeyedStateHandleWrapper(KeyedStateHandle keyedStateHandle) {
+            this.keyedStateHandle = keyedStateHandle;
+            this.isDiscarded = false;
+        }
+
+        @Override
+        public void registerSharedStates(SharedStateRegistry stateRegistry, 
long checkpointID) {
+            keyedStateHandle.registerSharedStates(stateRegistry, checkpointID);
+        }
+
+        @Override
+        public KeyGroupRange getKeyGroupRange() {
+            return keyedStateHandle.getKeyGroupRange();
+        }
+
+        @Nullable
+        @Override
+        public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+            return keyedStateHandle.getIntersection(keyGroupRange);
+        }
+
+        @Override
+        public void discardState() throws Exception {
+            isDiscarded = true;
+            keyedStateHandle.discardState();
+        }
+
+        @Override
+        public long getStateSize() {
+            return keyedStateHandle.getStateSize();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            KeyedStateHandleWrapper that = (KeyedStateHandleWrapper) o;
+            return Objects.equals(keyedStateHandle, that.keyedStateHandle);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(keyedStateHandle);
+        }
+
+        boolean isDiscarded() {
+            return isDiscarded;
+        }
+    }
+
+    private static class IncrementalStateHandleWrapper extends 
KeyedStateHandleWrapper {

Review comment:
       Can you explain why do we need two versions of `KeyedStateHandleWrapper`?
   Is it because of `copy`? If so, can't we instead expose wrapped handle via 
getter and call `copy` on it directly?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
##########
@@ -98,6 +106,77 @@ public void testUnregisterWithUnexistedKey() {
         sharedStateRegistry.unregisterUnusedState(Long.MAX_VALUE);
     }
 
+    @Test
+    public void testRegisterChangelogStateBackendHandles() throws 
InterruptedException {
+        SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
+        long materializationId1 = 1L;
+        IncrementalStateHandleWrapper incrementalTMStateHandle1 =
+                createDummyIncrementalStateHandle(materializationId1);
+
+        IncrementalStateHandleWrapper incrementalJMStateHandle11 = 
incrementalTMStateHandle1.copy();
+        ChangelogStateHandleWrapper changelogStateHandle1 = 
createDummyChangelogStateHandle(1, 2);
+        long checkpointId1 = 41;
+        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl 
changelogStateBackendHandle1 =
+                new 
ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(
+                        Collections.singletonList(incrementalJMStateHandle11),
+                        Collections.singletonList(changelogStateHandle1),
+                        incrementalTMStateHandle1.getKeyGroupRange());
+        changelogStateBackendHandle1.registerSharedStates(sharedStateRegistry, 
checkpointId1);
+        sharedStateRegistry.checkpointCompleted(checkpointId1);
+        sharedStateRegistry.unregisterUnusedState(checkpointId1);
+
+        IncrementalStateHandleWrapper incrementalJMStateHandle12 = 
incrementalTMStateHandle1.copy();
+        ChangelogStateHandleWrapper changelogStateHandle2 = 
createDummyChangelogStateHandle(2, 3);
+        long checkpointId2 = 42;
+        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl 
changelogStateBackendHandle2 =
+                new 
ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(
+                        Collections.singletonList(incrementalJMStateHandle12),

Review comment:
       NIT: add static imports?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to