rkhachatryan commented on code in PR #20268:
URL: https://github.com/apache/flink/pull/20268#discussion_r925622394


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java:
##########
@@ -26,16 +26,53 @@
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
 
 /** Test for {@link CopyOnWriteStateTable}. */
 public class CopyOnWriteStateTableTest {
 
+    /**
+     * This tests that Whether serializers are consistent between {@link 
StateTable} and {@link
+     * StateMap}.
+     */
+    @Test
+    public void testSerializerAfterMetaInfoChanged() {
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> 
originalMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V1TestTypeSerializer());
+        InternalKeyContext<Integer> mockKeyContext = new 
MockInternalKeyContext<>();
+        CopyOnWriteStateTable<Integer, Integer, TestType> table =
+                new CopyOnWriteStateTable<>(
+                        mockKeyContext, originalMetaInfo, 
IntSerializer.INSTANCE);
+
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> newMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V2TestTypeSerializer());
+        table.setMetaInfo(newMetaInfo);
+        long count =
+                Arrays.stream(table.getState())
+                        .filter(
+                                stateEntries ->
+                                        ((CopyOnWriteStateMap<?, ?, ?>) 
stateEntries)
+                                                .getStateSerializer()
+                                                
.equals(table.getStateSerializer()))
+                        .count();
+        Assert.assertEquals(table.getState().length, count);

Review Comment:
   1. How about adding a check: `checkState(table.getState().length > 0);` ? 
That would fail the test if its configuration (key group range for example) is 
invalid
   2. How about moving the assertion inside the loop:
   ```
           for (StateMap<?, ?, ?> stateEntries : table.getState()) {
               assertEquals(
                       table.getStateSerializer(),
                       ((CopyOnWriteStateMap<?, ?, ?>) 
stateEntries).getStateSerializer());
           }
   
   ```
   That would provide an informative error message in case of assertion failure.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java:
##########
@@ -26,16 +26,53 @@
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
 
 /** Test for {@link CopyOnWriteStateTable}. */
 public class CopyOnWriteStateTableTest {
 
+    /**
+     * This tests that Whether serializers are consistent between {@link 
StateTable} and {@link
+     * StateMap}.
+     */
+    @Test
+    public void testSerializerAfterMetaInfoChanged() {
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> 
originalMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V1TestTypeSerializer());
+        InternalKeyContext<Integer> mockKeyContext = new 
MockInternalKeyContext<>();

Review Comment:
   I think there's no need for mocking here, we can use 
`InternalKeyContextImpl`:
   ```
   InternalKeyContext<Integer> keyContext =
           new InternalKeyContextImpl<>(KeyGroupRange.of(0, 9), 10);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to