Repository: flink Updated Branches: refs/heads/master f4c336b16 -> c38589766
[FLINK-4960] Add AbstractStreamOperatorTestHarness.repackageState() The new method allows testing operator scale-in by combining several OperatorStateHandles (that result from TestHarness.snapshot()) into one to allow restoring an operator with a lower parallelism. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3858976 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3858976 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3858976 Branch: refs/heads/master Commit: c385897661f36122e34ffa6a11996b983e2dc14a Parents: f4c336b Author: kl0u <[email protected]> Authored: Fri Oct 28 15:37:42 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Nov 4 10:24:40 2016 +0100 ---------------------------------------------------------------------- .../operators/AbstractStreamOperatorTest.java | 106 ++++++++++++++++++- .../util/AbstractStreamOperatorTestHarness.java | 69 ++++++++++++ 2 files changed, 173 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c3858976/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 21f426b..fd05353 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; @@ -177,7 +178,7 @@ public class AbstractStreamOperatorTest { * assigned to operator subtasks when restoring. */ @Test - public void testStateAndTimerStateShuffling() throws Exception { + public void testStateAndTimerStateShufflingScalingUp() throws Exception { final int MAX_PARALLELISM = 10; // first get two keys that will fall into different key-group ranges that go @@ -249,7 +250,6 @@ public class AbstractStreamOperatorTest { assertTrue(extractResult(testHarness1).isEmpty()); - testHarness1.setProcessingTime(10L); assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:HELLO")); @@ -299,6 +299,108 @@ public class AbstractStreamOperatorTest { assertTrue(extractResult(testHarness2).isEmpty()); } + @Test + public void testStateAndTimerStateShufflingScalingDown() throws Exception { + final int MAX_PARALLELISM = 10; + + // first get two keys that will fall into different key-group ranges that go + // to different operator subtasks when we restore + + // get two sub key-ranges so that we can restore two ranges separately + KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1); + KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1); + + // get two different keys, one per sub range + int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM); + int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM); + + TestOperator testOperator1 = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = + new KeyedOneInputStreamOperatorTestHarness<>( + testOperator1, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 2, /* num subtasks */ + 0 /* subtask index */); + + testHarness1.setup(); + testHarness1.open(); + + testHarness1.processWatermark(0L); + testHarness1.setProcessingTime(0L); + + TestOperator testOperator2 = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 = + new KeyedOneInputStreamOperatorTestHarness<>( + testOperator2, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 2, /* num subtasks */ + 1 /* subtask index */); + + + testHarness2.setup(); + testHarness2.open(); + + testHarness2.processWatermark(0L); + testHarness2.setProcessingTime(0L); + + // register some state with both instances and scale down to parallelism 1 + + testHarness1.processElement(new Tuple2<>(key1, "SET_EVENT_TIME_TIMER:30"), 0); + testHarness1.processElement(new Tuple2<>(key1, "SET_PROC_TIME_TIMER:30"), 0); + testHarness1.processElement(new Tuple2<>(key1, "SET_STATE:HELLO"), 0); + + testHarness2.processElement(new Tuple2<>(key2, "SET_EVENT_TIME_TIMER:40"), 0); + testHarness2.processElement(new Tuple2<>(key2, "SET_PROC_TIME_TIMER:40"), 0); + testHarness2.processElement(new Tuple2<>(key2, "SET_STATE:CIAO"), 0); + + // take a snapshot from each one of the "parallel" instances of the operator + // and combine them into one so that we can scale down + + OperatorStateHandles repackagedState = + AbstractStreamOperatorTestHarness.repackageState( + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + // now, for the third operator that scales down from parallelism of 2 to 1 + TestOperator testOperator3 = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness3 = + new KeyedOneInputStreamOperatorTestHarness<>( + testOperator3, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 1, /* num subtasks */ + 0 /* subtask index */); + + testHarness3.setup(); + testHarness3.initializeState(repackagedState); + testHarness3.open(); + + testHarness3.processWatermark(30L); + assertThat(extractResult(testHarness3), contains("ON_EVENT_TIME:HELLO")); + assertTrue(extractResult(testHarness3).isEmpty()); + + testHarness3.processWatermark(40L); + assertThat(extractResult(testHarness3), contains("ON_EVENT_TIME:CIAO")); + assertTrue(extractResult(testHarness3).isEmpty()); + + testHarness3.setProcessingTime(30L); + assertThat(extractResult(testHarness3), contains("ON_PROC_TIME:HELLO")); + assertTrue(extractResult(testHarness3).isEmpty()); + + testHarness3.setProcessingTime(40L); + assertThat(extractResult(testHarness3), contains("ON_PROC_TIME:CIAO")); + assertTrue(extractResult(testHarness3).isEmpty()); + } + /** * Extracts the result values form the test harness and clear the output queue. */ http://git-wip-us.apache.org/repos/asf/flink/blob/c3858976/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 03f3bce..c923b17 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest; import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; @@ -309,6 +310,74 @@ public class AbstractStreamOperatorTestHarness<OUT> { initializeCalled = true; } + /** + * Takes the different {@link OperatorStateHandles} created by calling {@link #snapshot(long, long)} + * on different instances of {@link AbstractStreamOperatorTestHarness} (each one representing one subtask) + * and repacks them into a single {@link OperatorStateHandles} so that the parallelism of the test + * can change arbitrarily (i.e. be able to scale both up and down). + * + * <p> + * After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize + * a new instance with the resulting state. Bear in mind that for parallelism greater than one, you + * have to use the constructor {@link #AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}. + * + * <p> + * <b>NOTE: </b> each of the {@code handles} in the argument list is assumed to be from a single task of a single + * operator (i.e. chain length of one). + * + * <p> + * For an example of how to use it, have a look at + * {@link AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}. + * + * @param handles the different states to be merged. + * @return the resulting state, or {@code null} if no partial states are specified. + */ + public static OperatorStateHandles repackageState(OperatorStateHandles... handles) throws Exception { + + if (handles.length < 1) { + return null; + } else if (handles.length == 1) { + return handles[0]; + } + + List<OperatorStateHandle> mergedManagedOperatorState = new ArrayList<>(handles.length); + List<OperatorStateHandle> mergedRawOperatorState = new ArrayList<>(handles.length); + + List<KeyGroupsStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length); + List<KeyGroupsStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length); + + for (OperatorStateHandles handle: handles) { + + Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState(); + Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState(); + Collection<KeyGroupsStateHandle> managedKeyedState = handle.getManagedKeyedState(); + Collection<KeyGroupsStateHandle> rawKeyedState = handle.getRawKeyedState(); + + if (managedOperatorState != null) { + mergedManagedOperatorState.addAll(managedOperatorState); + } + + if (rawOperatorState != null) { + mergedRawOperatorState.addAll(rawOperatorState); + } + + if (managedKeyedState != null) { + mergedManagedKeyedState.addAll(managedKeyedState); + } + + if (rawKeyedState != null) { + mergedRawKeyedState.addAll(rawKeyedState); + } + } + + return new OperatorStateHandles( + 0, + null, + mergedManagedKeyedState, + mergedRawKeyedState, + mergedManagedOperatorState, + mergedRawOperatorState); + } /** * Calls {@link StreamOperator#open()}. This also
