This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 12a2ade31b4ba3beb9eb3fe1d26064223fc02fec
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Fri Oct 23 19:39:57 2020 +0800

    [FLINK-19748] [test] Adjust raw keyed state test to only write some key 
groups
    
    This closes #13773.
---
 .../api/operators/AbstractStreamOperatorTest.java  | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 6b035df..65f1bce 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
 
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -443,10 +444,11 @@ public class AbstractStreamOperatorTest {
                final int maxParallelism = 10;
                final int numSubtasks = 1;
                final int subtaskIndex = 0;
-               final KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 
maxParallelism - 1);
+               final List<Integer> keyGroupsToWrite = Arrays.asList(2, 3, 8);
 
                final byte[] testSnapshotData = "TEST".getBytes();
-               final CustomRawKeyedStateTestOperator testOperator = new 
CustomRawKeyedStateTestOperator(testSnapshotData);
+               final CustomRawKeyedStateTestOperator testOperator =
+                       new CustomRawKeyedStateTestOperator(testSnapshotData, 
keyGroupsToWrite);
 
                // snapshot and then restore
                OperatorSubtaskState snapshot;
@@ -474,7 +476,7 @@ public class AbstractStreamOperatorTest {
                        testHarness.open();
                }
 
-               assertThat(testOperator.restoredRawKeyedState, 
hasRestoredKeyGroupsWith(testSnapshotData, keyGroupRange));
+               assertThat(testOperator.restoredRawKeyedState, 
hasRestoredKeyGroupsWith(testSnapshotData, keyGroupsToWrite));
        }
 
        /**
@@ -578,11 +580,13 @@ public class AbstractStreamOperatorTest {
                private static final long serialVersionUID = 1L;
 
                private final byte[] snapshotBytes;
+               private final List<Integer> keyGroupsToWrite;
 
                private Map<Integer, byte[]> restoredRawKeyedState;
 
-               CustomRawKeyedStateTestOperator(byte[] snapshotBytes) {
+               CustomRawKeyedStateTestOperator(byte[] snapshotBytes, 
List<Integer> keyGroupsToWrite) {
                        this.snapshotBytes = Arrays.copyOf(snapshotBytes, 
snapshotBytes.length);
+                       this.keyGroupsToWrite = 
Preconditions.checkNotNull(keyGroupsToWrite);
                }
 
                @Override
@@ -599,7 +603,7 @@ public class AbstractStreamOperatorTest {
                public void snapshotState(StateSnapshotContext context) throws 
Exception {
                        super.snapshotState(context);
                        KeyedStateCheckpointOutputStream rawKeyedStateStream = 
context.getRawKeyedOperatorStateOutput();
-                       for (int keyGroupId : 
rawKeyedStateStream.getKeyGroupList()) {
+                       for (int keyGroupId : keyGroupsToWrite) {
                                
rawKeyedStateStream.startNewKeyGroup(keyGroupId);
                                rawKeyedStateStream.write(snapshotBytes);
                        }
@@ -628,15 +632,15 @@ public class AbstractStreamOperatorTest {
                return result;
        }
 
-       private static Matcher<Map<Integer, byte[]>> 
hasRestoredKeyGroupsWith(byte[] testSnapshotData, KeyGroupRange range) {
+       private static Matcher<Map<Integer, byte[]>> 
hasRestoredKeyGroupsWith(byte[] testSnapshotData, List<Integer> 
writtenKeyGroups) {
                return new TypeSafeMatcher<Map<Integer, byte[]>>() {
                        @Override
                        protected boolean matchesSafely(Map<Integer, byte[]> 
restored) {
-                               if (restored.size() != 
range.getNumberOfKeyGroups()) {
+                               if (restored.size() != writtenKeyGroups.size()) 
{
                                        return false;
                                }
 
-                               for (int writtenKeyGroupId : range) {
+                               for (int writtenKeyGroupId : writtenKeyGroups) {
                                        if 
(!Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) {
                                                return false;
                                        }
@@ -647,7 +651,7 @@ public class AbstractStreamOperatorTest {
 
                        @Override
                        public void describeTo(Description description) {
-                               description.appendText("Key groups: " + range + 
" with snapshot data " + Arrays.toString(testSnapshotData));
+                               description.appendText("Key groups: " + 
writtenKeyGroups + " with snapshot data " + Arrays.toString(testSnapshotData));
                        }
                };
        }

Reply via email to