StephanEwen commented on a change in pull request #11555: [FLINK-16576][state 
backends] Correct the logic of KeyGroupStateHandle#getIntersection
URL: https://github.com/apache/flink/pull/11555#discussion_r407750244
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ##########
 @@ -2944,98 +2945,134 @@ public void testSnapshotNonAccessedState() throws 
Exception {
         * This test verifies that state is correctly assigned to key groups 
and that restore
         * restores the relevant key groups in the backend.
         *
-        * <p>We have ten key groups. Initially, one backend is responsible for 
all ten key groups.
-        * Then we snapshot, split up the state and restore in to backends 
where each is responsible
-        * for five key groups. Then we make sure that the state is only 
available in the correct
-        * backend.
-        * @throws Exception
+        * <p>We have 128 key groups. Initially, four backends with different 
states are responsible for all the key groups equally.
+        * Different backends for the same operator may contains different 
states if we create the state in runtime (such as {@link DeltaTrigger#onElement}
+        * Then we snapshot, split up the state and restore into 2 backends 
where each is responsible
+        * for 64 key groups. Then we make sure that the state is only 
available in the correct backend.
         */
        @Test
-       public void testKeyGroupSnapshotRestore() throws Exception {
-               final int MAX_PARALLELISM = 10;
+       public void testKeyGroupSnapshotRestoreScaleDown() throws Exception {
+               testKeyGroupSnapshotRestore(4, 2, 128);
+       }
 
-               CheckpointStreamFactory streamFactory = createStreamFactory();
-               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-               final AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(
-                               IntSerializer.INSTANCE,
-                               MAX_PARALLELISM,
-                               new KeyGroupRange(0, MAX_PARALLELISM - 1),
-                               env);
+       /**
+        * This test verifies that state is correctly assigned to key groups 
and that restore
+        * restores the relevant key groups in the backend.
+        *
+        * <p>We have 128 key groups. Initially, two backends with different 
states are responsible for all the key groups equally.
+        * Different backends for the same operator may contains different 
states if we create the state in runtime (such as {@link DeltaTrigger#onElement}
+        * Then we snapshot, split up the state and restore into 4 backends 
where each is responsible
+        * for 32 key groups. Then we make sure that the state is only 
available in the correct backend.
+        */
+       @Test
+       public void testKeyGroupSnapshotRestoreScaleUp() throws Exception {
+               testKeyGroupSnapshotRestore(2, 4, 128);
+       }
 
-               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
+       /**
+        * This test verifies that state is correctly assigned to key groups 
and that restore
+        * restores the relevant key groups in the backend.
+        *
+        * <p>We have 128 key groups. Initially, two backends with different 
states are responsible for all the key groups equally.
+        * Different backends for the same operator may contains different 
states if we create the state in runtime (such as {@link DeltaTrigger#onElement}
+        * Then we snapshot, split up the state and restore into 2 backends 
where each is responsible
+        * for 64 key groups. Then we make sure that the state is only 
available in the correct backend.
+        */
+       @Test
+       public void testKeyGroupsSnapshotRestoreNoRescale() throws Exception {
+               testKeyGroupSnapshotRestore(2, 2, 128);
+       }
 
-               ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+       private void testKeyGroupSnapshotRestore(int sourceParallelism, int 
targetParallelism, int maxParallelism) throws Exception {
+               checkArgument(maxParallelism % sourceParallelism == 0, "Source 
parallelism should be a factor of maxParallelism, " + "sourParallelism =" + 
sourceParallelism + ", maxParallelsim=" + maxParallelism);
 
 Review comment:
   In the `checkArgument` helpers, we should always avoid string concatenation, 
because it happens eagerly and is costly. This is only a test, so performance 
is not critical, but we should still try to do it correct, because this code is 
also an example that other contributors follow.
   
   ```suggestion
                checkArgument(maxParallelism % sourceParallelism == 0, "Source 
parallelism should be a factor of maxParallelism, sourParallelism=%s, 
maxParallelsim=%s", sourceParallelism, maxParallelism);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to