This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 12a2ade31b4ba3beb9eb3fe1d26064223fc02fec Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Fri Oct 23 19:39:57 2020 +0800 [FLINK-19748] [test] Adjust raw keyed state test to only write some key groups This closes #13773. --- .../api/operators/AbstractStreamOperatorTest.java | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 6b035df..65f1bce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -443,10 +444,11 @@ public class AbstractStreamOperatorTest { final int maxParallelism = 10; final int numSubtasks = 1; final int subtaskIndex = 0; - final KeyGroupRange keyGroupRange = KeyGroupRange.of(0, maxParallelism - 1); + final List<Integer> keyGroupsToWrite = Arrays.asList(2, 3, 8); final byte[] testSnapshotData = "TEST".getBytes(); - final CustomRawKeyedStateTestOperator testOperator = new CustomRawKeyedStateTestOperator(testSnapshotData); + final CustomRawKeyedStateTestOperator testOperator = + new CustomRawKeyedStateTestOperator(testSnapshotData, keyGroupsToWrite); // snapshot and then restore OperatorSubtaskState snapshot; @@ -474,7 +476,7 @@ public class AbstractStreamOperatorTest { testHarness.open(); } - assertThat(testOperator.restoredRawKeyedState, hasRestoredKeyGroupsWith(testSnapshotData, keyGroupRange)); + assertThat(testOperator.restoredRawKeyedState, hasRestoredKeyGroupsWith(testSnapshotData, keyGroupsToWrite)); } /** @@ -578,11 +580,13 @@ public class AbstractStreamOperatorTest { private static final long serialVersionUID = 1L; private final byte[] snapshotBytes; + private final List<Integer> keyGroupsToWrite; private Map<Integer, byte[]> restoredRawKeyedState; - CustomRawKeyedStateTestOperator(byte[] snapshotBytes) { + CustomRawKeyedStateTestOperator(byte[] snapshotBytes, List<Integer> keyGroupsToWrite) { this.snapshotBytes = Arrays.copyOf(snapshotBytes, snapshotBytes.length); + this.keyGroupsToWrite = Preconditions.checkNotNull(keyGroupsToWrite); } @Override @@ -599,7 +603,7 @@ public class AbstractStreamOperatorTest { public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); KeyedStateCheckpointOutputStream rawKeyedStateStream = context.getRawKeyedOperatorStateOutput(); - for (int keyGroupId : rawKeyedStateStream.getKeyGroupList()) { + for (int keyGroupId : keyGroupsToWrite) { rawKeyedStateStream.startNewKeyGroup(keyGroupId); rawKeyedStateStream.write(snapshotBytes); } @@ -628,15 +632,15 @@ public class AbstractStreamOperatorTest { return result; } - private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith(byte[] testSnapshotData, KeyGroupRange range) { + private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith(byte[] testSnapshotData, List<Integer> writtenKeyGroups) { return new TypeSafeMatcher<Map<Integer, byte[]>>() { @Override protected boolean matchesSafely(Map<Integer, byte[]> restored) { - if (restored.size() != range.getNumberOfKeyGroups()) { + if (restored.size() != writtenKeyGroups.size()) { return false; } - for (int writtenKeyGroupId : range) { + for (int writtenKeyGroupId : writtenKeyGroups) { if (!Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) { return false; } @@ -647,7 +651,7 @@ public class AbstractStreamOperatorTest { @Override public void describeTo(Description description) { - description.appendText("Key groups: " + range + " with snapshot data " + Arrays.toString(testSnapshotData)); + description.appendText("Key groups: " + writtenKeyGroups + " with snapshot data " + Arrays.toString(testSnapshotData)); } }; }
