Repository: flink
Updated Branches:
  refs/heads/master f4c336b16 -> c38589766


[FLINK-4960] Add AbstractStreamOperatorTestHarness.repackageState()

The new method allows testing operator scale-in by combining several
OperatorStateHandles (that result from TestHarness.snapshot()) into one
to allow restoring an operator with a lower parallelism.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3858976
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3858976
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3858976

Branch: refs/heads/master
Commit: c385897661f36122e34ffa6a11996b983e2dc14a
Parents: f4c336b
Author: kl0u <[email protected]>
Authored: Fri Oct 28 15:37:42 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Nov 4 10:24:40 2016 +0100

----------------------------------------------------------------------
 .../operators/AbstractStreamOperatorTest.java   | 106 ++++++++++++++++++-
 .../util/AbstractStreamOperatorTestHarness.java |  69 ++++++++++++
 2 files changed, 173 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3858976/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 21f426b..fd05353 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.Test;
@@ -177,7 +178,7 @@ public class AbstractStreamOperatorTest {
         * assigned to operator subtasks when restoring.
         */
        @Test
-       public void testStateAndTimerStateShuffling() throws Exception {
+       public void testStateAndTimerStateShufflingScalingUp() throws Exception 
{
                final int MAX_PARALLELISM = 10;
 
                // first get two keys that will fall into different key-group 
ranges that go
@@ -249,7 +250,6 @@ public class AbstractStreamOperatorTest {
 
                assertTrue(extractResult(testHarness1).isEmpty());
 
-
                testHarness1.setProcessingTime(10L);
 
                assertThat(extractResult(testHarness1), 
contains("ON_PROC_TIME:HELLO"));
@@ -299,6 +299,108 @@ public class AbstractStreamOperatorTest {
                assertTrue(extractResult(testHarness2).isEmpty());
        }
 
+       @Test
+       public void testStateAndTimerStateShufflingScalingDown() throws 
Exception {
+               final int MAX_PARALLELISM = 10;
+
+               // first get two keys that will fall into different key-group 
ranges that go
+               // to different operator subtasks when we restore
+
+               // get two sub key-ranges so that we can restore two ranges 
separately
+               KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, 
(MAX_PARALLELISM / 2) - 1);
+               KeyGroupRange subKeyGroupRange2 = new 
KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1);
+
+               // get two different keys, one per sub range
+               int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, 
MAX_PARALLELISM);
+               int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, 
MAX_PARALLELISM);
+
+               TestOperator testOperator1 = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness1 =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               testOperator1,
+                               new TestKeySelector(),
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               MAX_PARALLELISM,
+                               2, /* num subtasks */
+                               0 /* subtask index */);
+
+               testHarness1.setup();
+               testHarness1.open();
+
+               testHarness1.processWatermark(0L);
+               testHarness1.setProcessingTime(0L);
+
+               TestOperator testOperator2 = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness2 =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               testOperator2,
+                               new TestKeySelector(),
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               MAX_PARALLELISM,
+                               2, /* num subtasks */
+                               1 /* subtask index */);
+
+
+               testHarness2.setup();
+               testHarness2.open();
+
+               testHarness2.processWatermark(0L);
+               testHarness2.setProcessingTime(0L);
+
+               // register some state with both instances and scale down to 
parallelism 1
+
+               testHarness1.processElement(new Tuple2<>(key1, 
"SET_EVENT_TIME_TIMER:30"), 0);
+               testHarness1.processElement(new Tuple2<>(key1, 
"SET_PROC_TIME_TIMER:30"), 0);
+               testHarness1.processElement(new Tuple2<>(key1, 
"SET_STATE:HELLO"), 0);
+
+               testHarness2.processElement(new Tuple2<>(key2, 
"SET_EVENT_TIME_TIMER:40"), 0);
+               testHarness2.processElement(new Tuple2<>(key2, 
"SET_PROC_TIME_TIMER:40"), 0);
+               testHarness2.processElement(new Tuple2<>(key2, 
"SET_STATE:CIAO"), 0);
+
+               // take a snapshot from each one of the "parallel" instances of 
the operator
+               // and combine them into one so that we can scale down
+
+               OperatorStateHandles repackagedState =
+                       AbstractStreamOperatorTestHarness.repackageState(
+                               testHarness1.snapshot(0, 0),
+                               testHarness2.snapshot(0, 0)
+                       );
+
+               // now, for the third operator that scales down from 
parallelism of 2 to 1
+               TestOperator testOperator3 = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness3 =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               testOperator3,
+                               new TestKeySelector(),
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               MAX_PARALLELISM,
+                               1, /* num subtasks */
+                               0 /* subtask index */);
+
+               testHarness3.setup();
+               testHarness3.initializeState(repackagedState);
+               testHarness3.open();
+
+               testHarness3.processWatermark(30L);
+               assertThat(extractResult(testHarness3), 
contains("ON_EVENT_TIME:HELLO"));
+               assertTrue(extractResult(testHarness3).isEmpty());
+
+               testHarness3.processWatermark(40L);
+               assertThat(extractResult(testHarness3), 
contains("ON_EVENT_TIME:CIAO"));
+               assertTrue(extractResult(testHarness3).isEmpty());
+
+               testHarness3.setProcessingTime(30L);
+               assertThat(extractResult(testHarness3), 
contains("ON_PROC_TIME:HELLO"));
+               assertTrue(extractResult(testHarness3).isEmpty());
+
+               testHarness3.setProcessingTime(40L);
+               assertThat(extractResult(testHarness3), 
contains("ON_PROC_TIME:CIAO"));
+               assertTrue(extractResult(testHarness3).isEmpty());
+       }
+
        /**
         * Extracts the result values form the test harness and clear the 
output queue.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/c3858976/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 03f3bce..c923b17 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
@@ -309,6 +310,74 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                initializeCalled = true;
        }
 
+       /**
+        * Takes the different {@link OperatorStateHandles} created by calling 
{@link #snapshot(long, long)}
+        * on different instances of {@link AbstractStreamOperatorTestHarness} 
(each one representing one subtask)
+        * and repacks them into a single {@link OperatorStateHandles} so that 
the parallelism of the test
+        * can change arbitrarily (i.e. be able to scale both up and down).
+        *
+        * <p>
+        * After repacking the partial states, use {@link 
#initializeState(OperatorStateHandles)} to initialize
+        * a new instance with the resulting state. Bear in mind that for 
parallelism greater than one, you
+        * have to use the constructor {@link 
#AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
+        *
+        * <p>
+        * <b>NOTE: </b> each of the {@code handles} in the argument list is 
assumed to be from a single task of a single
+        * operator (i.e. chain length of one).
+        *
+        * <p>
+        * For an example of how to use it, have a look at
+        * {@link 
AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
+        *
+        * @param handles the different states to be merged.
+        * @return the resulting state, or {@code null} if no partial states 
are specified.
+        */
+       public static OperatorStateHandles 
repackageState(OperatorStateHandles... handles) throws Exception {
+
+               if (handles.length < 1) {
+                       return null;
+               } else if (handles.length == 1) {
+                       return handles[0];
+               }
+
+               List<OperatorStateHandle> mergedManagedOperatorState = new 
ArrayList<>(handles.length);
+               List<OperatorStateHandle> mergedRawOperatorState = new 
ArrayList<>(handles.length);
+
+               List<KeyGroupsStateHandle> mergedManagedKeyedState = new 
ArrayList<>(handles.length);
+               List<KeyGroupsStateHandle> mergedRawKeyedState = new 
ArrayList<>(handles.length);
+
+               for (OperatorStateHandles handle: handles) {
+
+                       Collection<OperatorStateHandle> managedOperatorState = 
handle.getManagedOperatorState();
+                       Collection<OperatorStateHandle> rawOperatorState = 
handle.getRawOperatorState();
+                       Collection<KeyGroupsStateHandle> managedKeyedState = 
handle.getManagedKeyedState();
+                       Collection<KeyGroupsStateHandle> rawKeyedState = 
handle.getRawKeyedState();
+
+                       if (managedOperatorState != null) {
+                               
mergedManagedOperatorState.addAll(managedOperatorState);
+                       }
+
+                       if (rawOperatorState != null) {
+                               mergedRawOperatorState.addAll(rawOperatorState);
+                       }
+
+                       if (managedKeyedState != null) {
+                               
mergedManagedKeyedState.addAll(managedKeyedState);
+                       }
+
+                       if (rawKeyedState != null) {
+                               mergedRawKeyedState.addAll(rawKeyedState);
+                       }
+               }
+
+               return new OperatorStateHandles(
+                       0,
+                       null,
+                       mergedManagedKeyedState,
+                       mergedRawKeyedState,
+                       mergedManagedOperatorState,
+                       mergedRawOperatorState);
+       }
 
        /**
         * Calls {@link StreamOperator#open()}. This also

Reply via email to