StephanEwen commented on a change in pull request #11555: [FLINK-16576][state
backends] Correct the logic of KeyGroupStateHandle#getIntersection
URL: https://github.com/apache/flink/pull/11555#discussion_r407750244
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
##########
@@ -2944,98 +2945,134 @@ public void testSnapshotNonAccessedState() throws
Exception {
* This test verifies that state is correctly assigned to key groups
and that restore
* restores the relevant key groups in the backend.
*
- * <p>We have ten key groups. Initially, one backend is responsible for
all ten key groups.
- * Then we snapshot, split up the state and restore in to backends
where each is responsible
- * for five key groups. Then we make sure that the state is only
available in the correct
- * backend.
- * @throws Exception
+ * <p>We have 128 key groups. Initially, four backends with different
states are responsible for all the key groups equally.
+ * Different backends for the same operator may contains different
states if we create the state in runtime (such as {@link DeltaTrigger#onElement}
+ * Then we snapshot, split up the state and restore into 2 backends
where each is responsible
+ * for 64 key groups. Then we make sure that the state is only
available in the correct backend.
*/
@Test
- public void testKeyGroupSnapshotRestore() throws Exception {
- final int MAX_PARALLELISM = 10;
+ public void testKeyGroupSnapshotRestoreScaleDown() throws Exception {
+ testKeyGroupSnapshotRestore(4, 2, 128);
+ }
- CheckpointStreamFactory streamFactory = createStreamFactory();
- SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
- final AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(
- IntSerializer.INSTANCE,
- MAX_PARALLELISM,
- new KeyGroupRange(0, MAX_PARALLELISM - 1),
- env);
+ /**
+ * This test verifies that state is correctly assigned to key groups
and that restore
+ * restores the relevant key groups in the backend.
+ *
+ * <p>We have 128 key groups. Initially, two backends with different
states are responsible for all the key groups equally.
+ * Different backends for the same operator may contains different
states if we create the state in runtime (such as {@link DeltaTrigger#onElement}
+ * Then we snapshot, split up the state and restore into 4 backends
where each is responsible
+ * for 32 key groups. Then we make sure that the state is only
available in the correct backend.
+ */
+ @Test
+ public void testKeyGroupSnapshotRestoreScaleUp() throws Exception {
+ testKeyGroupSnapshotRestore(2, 4, 128);
+ }
- ValueStateDescriptor<String> kvId = new
ValueStateDescriptor<>("id", String.class);
+ /**
+ * This test verifies that state is correctly assigned to key groups
and that restore
+ * restores the relevant key groups in the backend.
+ *
+ * <p>We have 128 key groups. Initially, two backends with different
states are responsible for all the key groups equally.
+ * Different backends for the same operator may contains different
states if we create the state in runtime (such as {@link DeltaTrigger#onElement}
+ * Then we snapshot, split up the state and restore into 2 backends
where each is responsible
+ * for 64 key groups. Then we make sure that the state is only
available in the correct backend.
+ */
+ @Test
+ public void testKeyGroupsSnapshotRestoreNoRescale() throws Exception {
+ testKeyGroupSnapshotRestore(2, 2, 128);
+ }
- ValueState<String> state =
backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+ private void testKeyGroupSnapshotRestore(int sourceParallelism, int
targetParallelism, int maxParallelism) throws Exception {
+ checkArgument(maxParallelism % sourceParallelism == 0, "Source
parallelism should be a factor of maxParallelism, " + "sourParallelism =" +
sourceParallelism + ", maxParallelsim=" + maxParallelism);
Review comment:
In the `checkArgument` helpers, we should always avoid string concatenation,
because it happens eagerly and is costly. This is only a test, so performance
is not critical, but we should still try to do it correct, because this code is
also an example that other contributors follow.
```suggestion
checkArgument(maxParallelism % sourceParallelism == 0, "Source
parallelism should be a factor of maxParallelism, sourParallelism=%s,
maxParallelsim=%s", sourceParallelism, maxParallelism);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services