[FLINK-4907] Add Test for Timers/State Provided by AbstractStreamOperator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa664e5b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa664e5b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa664e5b Branch: refs/heads/master Commit: fa664e5b9527ec82dae1f18746f7b2f0bbd7a3ba Parents: 94c65fb Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 25 12:25:30 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 26 23:26:29 2016 +0200 ---------------------------------------------------------------------- .../api/operators/AbstractStreamOperator.java | 2 +- .../operators/AbstractStreamOperatorTest.java | 400 +++++++++++++++++++ .../util/AbstractStreamOperatorTestHarness.java | 3 +- .../util/OneInputStreamOperatorTestHarness.java | 9 + 4 files changed, 411 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index b3da6b2..5f0dd85 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -263,7 +263,7 @@ public abstract class AbstractStreamOperator<OUT> KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(), container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(), - container.getIndexInSubtaskGroup()); + container.getEnvironment().getTaskInfo().getIndexOfThisSubtask()); this.keyedStateBackend = container.createKeyedStateBackend( keySerializer, http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java new file mode 100644 index 0000000..21f426b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; + + +/** + * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly + * tests timers and state and whether they are correctly checkpointed/restored + * with key-group reshuffling. + */ +public class AbstractStreamOperatorTest { + + @Test + public void testStateDoesNotInterfere() throws Exception { + TestOperator testOperator = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); + testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0); + + testHarness.processElement(new Tuple2<>(1, "EMIT_STATE"), 0); + testHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0); + + assertThat( + extractResult(testHarness), + contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO")); + } + + /** + * Verify that firing event-time timers see the state of the key that was active + * when the timer was set. + */ + @Test + public void testEventTimeTimersDontInterfere() throws Exception { + TestOperator testOperator = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + testHarness.processWatermark(0L); + + testHarness.processElement(new Tuple2<>(1, "SET_EVENT_TIME_TIMER:20"), 0); + + testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); + testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0); + + testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:10"), 0); + + testHarness.processWatermark(10L); + + assertThat( + extractResult(testHarness), + contains("ON_EVENT_TIME:HELLO")); + + testHarness.processWatermark(20L); + + assertThat( + extractResult(testHarness), + contains("ON_EVENT_TIME:CIAO")); + } + + /** + * Verify that firing processing-time timers see the state of the key that was active + * when the timer was set. + */ + @Test + public void testProcessingTimeTimersDontInterfere() throws Exception { + TestOperator testOperator = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0); + + testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); + testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0); + + testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0); + + testHarness.setProcessingTime(10L); + + assertThat( + extractResult(testHarness), + contains("ON_PROC_TIME:HELLO")); + + testHarness.setProcessingTime(20L); + + assertThat( + extractResult(testHarness), + contains("ON_PROC_TIME:CIAO")); + } + + /** + * Verify that timers for the different time domains don't clash. + */ + @Test + public void testProcessingTimeAndEventTimeDontInterfere() throws Exception { + TestOperator testOperator = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + testHarness.setProcessingTime(0L); + testHarness.processWatermark(0L); + + testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0); + testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:20"), 0); + + testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0); + + testHarness.processWatermark(20L); + + assertThat( + extractResult(testHarness), + contains("ON_EVENT_TIME:HELLO")); + + testHarness.setProcessingTime(10L); + + assertThat( + extractResult(testHarness), + contains("ON_PROC_TIME:HELLO")); + } + + /** + * Verify that state and timers are checkpointed per key group and that they are correctly + * assigned to operator subtasks when restoring. + */ + @Test + public void testStateAndTimerStateShuffling() throws Exception { + final int MAX_PARALLELISM = 10; + + // first get two keys that will fall into different key-group ranges that go + // to different operator subtasks when we restore + + // get two sub key-ranges so that we can restore two ranges separately + KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1); + KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1); + + // get two different keys, one per sub range + int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM); + int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM); + + TestOperator testOperator = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + testOperator, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 1, /* num subtasks */ + 0 /* subtask index */); + + testHarness.open(); + + testHarness.processWatermark(0L); + testHarness.setProcessingTime(0L); + + testHarness.processElement(new Tuple2<>(key1, "SET_EVENT_TIME_TIMER:10"), 0); + testHarness.processElement(new Tuple2<>(key2, "SET_EVENT_TIME_TIMER:20"), 0); + + testHarness.processElement(new Tuple2<>(key1, "SET_PROC_TIME_TIMER:10"), 0); + testHarness.processElement(new Tuple2<>(key2, "SET_PROC_TIME_TIMER:20"), 0); + + testHarness.processElement(new Tuple2<>(key1, "SET_STATE:HELLO"), 0); + testHarness.processElement(new Tuple2<>(key2, "SET_STATE:CIAO"), 0); + + assertTrue(extractResult(testHarness).isEmpty()); + + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + // now, restore in two operators, first operator 1 + + TestOperator testOperator1 = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = + new KeyedOneInputStreamOperatorTestHarness<>( + testOperator1, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 2, /* num subtasks */ + 0 /* subtask index */); + + testHarness1.setup(); + testHarness1.initializeState(snapshot); + testHarness1.open(); + + testHarness1.processWatermark(10L); + + assertThat(extractResult(testHarness1), contains("ON_EVENT_TIME:HELLO")); + + assertTrue(extractResult(testHarness1).isEmpty()); + + // this should not trigger anything, the trigger for WM=20 should sit in the + // other operator subtask + testHarness1.processWatermark(20L); + + assertTrue(extractResult(testHarness1).isEmpty()); + + + testHarness1.setProcessingTime(10L); + + assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:HELLO")); + + assertTrue(extractResult(testHarness1).isEmpty()); + + // this should not trigger anything, the trigger for TIME=20 should sit in the + // other operator subtask + testHarness1.setProcessingTime(20L); + + assertTrue(extractResult(testHarness1).isEmpty()); + + // now, for the second operator + TestOperator testOperator2 = new TestOperator(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 = + new KeyedOneInputStreamOperatorTestHarness<>( + testOperator2, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 2, /* num subtasks */ + 1 /* subtask index */); + + testHarness2.setup(); + testHarness2.initializeState(snapshot); + testHarness2.open(); + + testHarness2.processWatermark(10L); + + // nothing should happen because this timer is in the other subtask + assertTrue(extractResult(testHarness2).isEmpty()); + + testHarness2.processWatermark(20L); + + assertThat(extractResult(testHarness2), contains("ON_EVENT_TIME:CIAO")); + + testHarness2.setProcessingTime(10L); + + // nothing should happen because this timer is in the other subtask + assertTrue(extractResult(testHarness2).isEmpty()); + + testHarness2.setProcessingTime(20L); + + assertThat(extractResult(testHarness2), contains("ON_PROC_TIME:CIAO")); + + assertTrue(extractResult(testHarness2).isEmpty()); + } + + /** + * Extracts the result values form the test harness and clear the output queue. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) { + List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords(); + List<T> result = new ArrayList<>(); + for (Object in : streamRecords) { + if (in instanceof StreamRecord) { + result.add((T) ((StreamRecord) in).getValue()); + } + } + testHarness.getOutput().clear(); + return result; + } + + + private static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Tuple2<Integer, String> value) throws Exception { + return value.f0; + } + } + + /** + * Testing operator that can respond to commands by either setting/deleting state, emitting + * state or setting timers. + */ + private static class TestOperator + extends AbstractStreamOperator<String> + implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> { + + private static final long serialVersionUID = 1L; + + private transient InternalTimerService<VoidNamespace> timerService; + + private final ValueStateDescriptor<String> stateDescriptor = + new ValueStateDescriptor<>("state", StringSerializer.INSTANCE, null); + + @Override + public void open() throws Exception { + super.open(); + + this.timerService = getInternalTimerService( + "test-timers", + IntSerializer.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + this); + } + + @Override + public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception { + String[] command = element.getValue().f1.split(":"); + switch (command[0]) { + case "SET_STATE": + getPartitionedState(stateDescriptor).update(command[1]); + break; + case "DELETE_STATE": + getPartitionedState(stateDescriptor).clear(); + break; + case "SET_EVENT_TIME_TIMER": + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1])); + break; + case "SET_PROC_TIME_TIMER": + timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1])); + break; + case "EMIT_STATE": + String stateValue = getPartitionedState(stateDescriptor).value(); + output.collect(new StreamRecord<>("ON_ELEMENT:" + element.getValue().f0 + ":" + stateValue)); + break; + default: + throw new IllegalArgumentException(); + } + } + + @Override + public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception { + String stateValue = getPartitionedState(stateDescriptor).value(); + output.collect(new StreamRecord<>("ON_EVENT_TIME:" + stateValue)); + } + + @Override + public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception { + String stateValue = getPartitionedState(stateDescriptor).value(); + output.collect(new StreamRecord<>("ON_PROC_TIME:" + stateValue)); + } + } + + private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) { + Random rand = new Random(System.currentTimeMillis()); + int result = rand.nextInt(); + while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism))) { + result = rand.nextInt(); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index af1a7ba..03f3bce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -214,8 +214,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { } /** - * Get all the output from the task and clear the output buffer. - * This contains only StreamRecords. + * Get only the {@link StreamRecord StreamRecords} emitted by the operator. */ @SuppressWarnings("unchecked") public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() { http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 105922b..7468d9a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -49,6 +49,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> this.oneInputOperator = operator; } + public void processElement(IN value, long timestamp) throws Exception { + processElement(new StreamRecord<>(value, timestamp)); + } + + public void processElement(StreamRecord<IN> element) throws Exception { operator.setKeyContextElement1(element); oneInputOperator.processElement(element); @@ -61,6 +66,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> } } + public void processWatermark(long watermark) throws Exception { + oneInputOperator.processWatermark(new Watermark(watermark)); + } + public void processWatermark(Watermark mark) throws Exception { oneInputOperator.processWatermark(mark); }
