[FLINK-4907] Add Test for Timers/State Provided by AbstractStreamOperator

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa664e5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa664e5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa664e5b

Branch: refs/heads/master
Commit: fa664e5b9527ec82dae1f18746f7b2f0bbd7a3ba
Parents: 94c65fb
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 25 12:25:30 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 26 23:26:29 2016 +0200

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   2 +-
 .../operators/AbstractStreamOperatorTest.java   | 400 +++++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java |   3 +-
 .../util/OneInputStreamOperatorTestHarness.java |   9 +
 4 files changed, 411 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index b3da6b2..5f0dd85 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -263,7 +263,7 @@ public abstract class AbstractStreamOperator<OUT>
                                KeyGroupRange subTaskKeyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                                                
container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
                                                
container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
-                                               
container.getIndexInSubtaskGroup());
+                                               
container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
 
                                this.keyedStateBackend = 
container.createKeyedStateBackend(
                                                keySerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
new file mode 100644
index 0000000..21f426b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+
+/**
+ * Tests for the facilities provided by {@link AbstractStreamOperator}. This 
mostly
+ * tests timers and state and whether they are correctly checkpointed/restored
+ * with key-group reshuffling.
+ */
+public class AbstractStreamOperatorTest {
+
+       @Test
+       public void testStateDoesNotInterfere() throws Exception {
+               TestOperator testOperator = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), 
BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.open();
+
+               testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 
0);
+               testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 
0);
+
+               testHarness.processElement(new Tuple2<>(1, "EMIT_STATE"), 0);
+               testHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0);
+
+               assertThat(
+                               extractResult(testHarness),
+                               contains("ON_ELEMENT:1:CIAO", 
"ON_ELEMENT:0:HELLO"));
+       }
+
+       /**
+        * Verify that firing event-time timers see the state of the key that 
was active
+        * when the timer was set.
+        */
+       @Test
+       public void testEventTimeTimersDontInterfere() throws Exception {
+               TestOperator testOperator = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), 
BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.open();
+
+               testHarness.processWatermark(0L);
+
+               testHarness.processElement(new Tuple2<>(1, 
"SET_EVENT_TIME_TIMER:20"), 0);
+
+               testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 
0);
+               testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 
0);
+
+               testHarness.processElement(new Tuple2<>(0, 
"SET_EVENT_TIME_TIMER:10"), 0);
+
+               testHarness.processWatermark(10L);
+
+               assertThat(
+                               extractResult(testHarness),
+                               contains("ON_EVENT_TIME:HELLO"));
+
+               testHarness.processWatermark(20L);
+
+               assertThat(
+                               extractResult(testHarness),
+                               contains("ON_EVENT_TIME:CIAO"));
+       }
+
+       /**
+        * Verify that firing processing-time timers see the state of the key 
that was active
+        * when the timer was set.
+        */
+       @Test
+       public void testProcessingTimeTimersDontInterfere() throws Exception {
+               TestOperator testOperator = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), 
BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.open();
+
+               testHarness.setProcessingTime(0L);
+
+               testHarness.processElement(new Tuple2<>(1, 
"SET_PROC_TIME_TIMER:20"), 0);
+
+               testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 
0);
+               testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 
0);
+
+               testHarness.processElement(new Tuple2<>(0, 
"SET_PROC_TIME_TIMER:10"), 0);
+
+               testHarness.setProcessingTime(10L);
+
+               assertThat(
+                               extractResult(testHarness),
+                               contains("ON_PROC_TIME:HELLO"));
+
+               testHarness.setProcessingTime(20L);
+
+               assertThat(
+                               extractResult(testHarness),
+                               contains("ON_PROC_TIME:CIAO"));
+       }
+
+       /**
+        * Verify that timers for the different time domains don't clash.
+        */
+       @Test
+       public void testProcessingTimeAndEventTimeDontInterfere() throws 
Exception {
+               TestOperator testOperator = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), 
BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.open();
+
+               testHarness.setProcessingTime(0L);
+               testHarness.processWatermark(0L);
+
+               testHarness.processElement(new Tuple2<>(0, 
"SET_PROC_TIME_TIMER:10"), 0);
+               testHarness.processElement(new Tuple2<>(0, 
"SET_EVENT_TIME_TIMER:20"), 0);
+
+               testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 
0);
+
+               testHarness.processWatermark(20L);
+
+               assertThat(
+                               extractResult(testHarness),
+                               contains("ON_EVENT_TIME:HELLO"));
+
+               testHarness.setProcessingTime(10L);
+
+               assertThat(
+                               extractResult(testHarness),
+                               contains("ON_PROC_TIME:HELLO"));
+       }
+
+       /**
+        * Verify that state and timers are checkpointed per key group and that 
they are correctly
+        * assigned to operator subtasks when restoring.
+        */
+       @Test
+       public void testStateAndTimerStateShuffling() throws Exception {
+               final int MAX_PARALLELISM = 10;
+
+               // first get two keys that will fall into different key-group 
ranges that go
+               // to different operator subtasks when we restore
+
+               // get two sub key-ranges so that we can restore two ranges 
separately
+               KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, 
(MAX_PARALLELISM / 2) - 1);
+               KeyGroupRange subKeyGroupRange2 = new 
KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1);
+
+               // get two different keys, one per sub range
+               int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, 
MAX_PARALLELISM);
+               int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, 
MAX_PARALLELISM);
+
+               TestOperator testOperator = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               testOperator,
+                                               new TestKeySelector(),
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               MAX_PARALLELISM,
+                                               1, /* num subtasks */
+                                               0 /* subtask index */);
+
+               testHarness.open();
+
+               testHarness.processWatermark(0L);
+               testHarness.setProcessingTime(0L);
+
+               testHarness.processElement(new Tuple2<>(key1, 
"SET_EVENT_TIME_TIMER:10"), 0);
+               testHarness.processElement(new Tuple2<>(key2, 
"SET_EVENT_TIME_TIMER:20"), 0);
+
+               testHarness.processElement(new Tuple2<>(key1, 
"SET_PROC_TIME_TIMER:10"), 0);
+               testHarness.processElement(new Tuple2<>(key2, 
"SET_PROC_TIME_TIMER:20"), 0);
+
+               testHarness.processElement(new Tuple2<>(key1, 
"SET_STATE:HELLO"), 0);
+               testHarness.processElement(new Tuple2<>(key2, 
"SET_STATE:CIAO"), 0);
+
+               assertTrue(extractResult(testHarness).isEmpty());
+
+               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+               // now, restore in two operators, first operator 1
+
+               TestOperator testOperator1 = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness1 =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               testOperator1,
+                                               new TestKeySelector(),
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               MAX_PARALLELISM,
+                                               2, /* num subtasks */
+                                               0 /* subtask index */);
+
+               testHarness1.setup();
+               testHarness1.initializeState(snapshot);
+               testHarness1.open();
+
+               testHarness1.processWatermark(10L);
+
+               assertThat(extractResult(testHarness1), 
contains("ON_EVENT_TIME:HELLO"));
+
+               assertTrue(extractResult(testHarness1).isEmpty());
+
+               // this should not trigger anything, the trigger for WM=20 
should sit in the
+               // other operator subtask
+               testHarness1.processWatermark(20L);
+
+               assertTrue(extractResult(testHarness1).isEmpty());
+
+
+               testHarness1.setProcessingTime(10L);
+
+               assertThat(extractResult(testHarness1), 
contains("ON_PROC_TIME:HELLO"));
+
+               assertTrue(extractResult(testHarness1).isEmpty());
+
+               // this should not trigger anything, the trigger for TIME=20 
should sit in the
+               // other operator subtask
+               testHarness1.setProcessingTime(20L);
+
+               assertTrue(extractResult(testHarness1).isEmpty());
+
+               // now, for the second operator
+               TestOperator testOperator2 = new TestOperator();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String> testHarness2 =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               testOperator2,
+                                               new TestKeySelector(),
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               MAX_PARALLELISM,
+                                               2, /* num subtasks */
+                                               1 /* subtask index */);
+
+               testHarness2.setup();
+               testHarness2.initializeState(snapshot);
+               testHarness2.open();
+
+               testHarness2.processWatermark(10L);
+
+               // nothing should happen because this timer is in the other 
subtask
+               assertTrue(extractResult(testHarness2).isEmpty());
+
+               testHarness2.processWatermark(20L);
+
+               assertThat(extractResult(testHarness2), 
contains("ON_EVENT_TIME:CIAO"));
+
+               testHarness2.setProcessingTime(10L);
+
+               // nothing should happen because this timer is in the other 
subtask
+               assertTrue(extractResult(testHarness2).isEmpty());
+
+               testHarness2.setProcessingTime(20L);
+
+               assertThat(extractResult(testHarness2), 
contains("ON_PROC_TIME:CIAO"));
+
+               assertTrue(extractResult(testHarness2).isEmpty());
+       }
+
+       /**
+        * Extracts the result values form the test harness and clear the 
output queue.
+        */
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, 
T> testHarness) {
+               List<StreamRecord<? extends T>> streamRecords = 
testHarness.extractOutputStreamRecords();
+               List<T> result = new ArrayList<>();
+               for (Object in : streamRecords) {
+                       if (in instanceof StreamRecord) {
+                               result.add((T) ((StreamRecord) in).getValue());
+                       }
+               }
+               testHarness.getOutput().clear();
+               return result;
+       }
+
+
+       private static class TestKeySelector implements 
KeySelector<Tuple2<Integer, String>, Integer> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Integer getKey(Tuple2<Integer, String> value) throws 
Exception {
+                       return value.f0;
+               }
+       }
+
+       /**
+        * Testing operator that can respond to commands by either 
setting/deleting state, emitting
+        * state or setting timers.
+        */
+       private static class TestOperator
+                       extends AbstractStreamOperator<String>
+                       implements OneInputStreamOperator<Tuple2<Integer, 
String>, String>, Triggerable<Integer, VoidNamespace> {
+
+               private static final long serialVersionUID = 1L;
+
+               private transient InternalTimerService<VoidNamespace> 
timerService;
+
+               private final ValueStateDescriptor<String> stateDescriptor =
+                               new ValueStateDescriptor<>("state", 
StringSerializer.INSTANCE, null);
+
+               @Override
+               public void open() throws Exception {
+                       super.open();
+
+                       this.timerService = getInternalTimerService(
+                                       "test-timers",
+                                       IntSerializer.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       this);
+               }
+
+               @Override
+               public void processElement(StreamRecord<Tuple2<Integer, 
String>> element) throws Exception {
+                       String[] command = element.getValue().f1.split(":");
+                       switch (command[0]) {
+                               case "SET_STATE":
+                                       
getPartitionedState(stateDescriptor).update(command[1]);
+                                       break;
+                               case "DELETE_STATE":
+                                       
getPartitionedState(stateDescriptor).clear();
+                                       break;
+                               case "SET_EVENT_TIME_TIMER":
+                                       
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.parseLong(command[1]));
+                                       break;
+                               case "SET_PROC_TIME_TIMER":
+                                       
timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 
Long.parseLong(command[1]));
+                                       break;
+                               case "EMIT_STATE":
+                                       String stateValue = 
getPartitionedState(stateDescriptor).value();
+                                       output.collect(new 
StreamRecord<>("ON_ELEMENT:" + element.getValue().f0 + ":" + stateValue));
+                                       break;
+                               default:
+                                       throw new IllegalArgumentException();
+                       }
+               }
+
+               @Override
+               public void onEventTime(InternalTimer<Integer, VoidNamespace> 
timer) throws Exception {
+                       String stateValue = 
getPartitionedState(stateDescriptor).value();
+                       output.collect(new StreamRecord<>("ON_EVENT_TIME:" + 
stateValue));
+               }
+
+               @Override
+               public void onProcessingTime(InternalTimer<Integer, 
VoidNamespace> timer) throws Exception {
+                       String stateValue = 
getPartitionedState(stateDescriptor).value();
+                       output.collect(new StreamRecord<>("ON_PROC_TIME:" + 
stateValue));
+               }
+       }
+
+       private static int getKeyInKeyGroupRange(KeyGroupRange range, int 
maxParallelism) {
+               Random rand = new Random(System.currentTimeMillis());
+               int result = rand.nextInt();
+               while 
(!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, 
maxParallelism))) {
+                       result = rand.nextInt();
+               }
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index af1a7ba..03f3bce 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -214,8 +214,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        }
 
        /**
-        * Get all the output from the task and clear the output buffer.
-        * This contains only StreamRecords.
+        * Get only the {@link StreamRecord StreamRecords} emitted by the 
operator.
         */
        @SuppressWarnings("unchecked")
        public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 105922b..7468d9a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -49,6 +49,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
                this.oneInputOperator = operator;
        }
 
+       public void processElement(IN value, long timestamp) throws Exception {
+               processElement(new StreamRecord<>(value, timestamp));
+       }
+
+
        public void processElement(StreamRecord<IN> element) throws Exception {
                operator.setKeyContextElement1(element);
                oneInputOperator.processElement(element);
@@ -61,6 +66,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
                }
        }
 
+       public void processWatermark(long watermark) throws Exception {
+               oneInputOperator.processWatermark(new Watermark(watermark));
+       }
+
        public void processWatermark(Watermark mark) throws Exception {
                oneInputOperator.processWatermark(mark);
        }

Reply via email to