Repository: flink Updated Branches: refs/heads/master 788b83921 -> daf0ccda4
[FLINK-5420] [cep] Make the CEP operators rescalable Introduces the KeyRegistry in the TimeServiceHandler which allows to specify a callback and register keys for which we want this callback to be invoked on each watermark. Given this service, now the CEP operator has only keyed state, and the non-keyed one (keys) are handled by the KeyRegistry. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daf0ccda Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daf0ccda Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daf0ccda Branch: refs/heads/master Commit: daf0ccda4dc60a267be7b8074d40e48d22ccb13f Parents: 788b839 Author: kl0u <[email protected]> Authored: Fri Feb 24 10:34:43 2017 +0100 Committer: kl0u <[email protected]> Committed: Tue Feb 28 19:04:27 2017 +0100 ---------------------------------------------------------------------- .../AbstractKeyedCEPPatternOperator.java | 132 +++--- .../flink/cep/operator/CEPOperatorUtils.java | 1 - .../flink/cep/operator/CEPOperatorTest.java | 26 +- .../flink/cep/operator/CEPRescalingTest.java | 417 +++++++++++++++++++ .../api/operators/AbstractStreamOperator.java | 133 +++--- .../operators/InternalTimeServiceManager.java | 191 +++++++++ .../InternalWatermarkCallbackService.java | 239 +++++++++++ .../api/operators/OnWatermarkCallback.java | 41 ++ .../operators/AbstractStreamOperatorTest.java | 234 ++++++++++- 9 files changed, 1245 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 90ee846..f534bec 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -24,14 +24,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.InternalWatermarkCallbackService; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OnWatermarkCallback; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; @@ -40,10 +38,8 @@ import org.apache.flink.util.Preconditions; import java.io.IOException; import java.io.Serializable; -import java.util.HashSet; import java.util.Objects; import java.util.PriorityQueue; -import java.util.Set; /** * Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates @@ -58,7 +54,7 @@ import java.util.Set; */ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator { + implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = -4166778210774160757L; @@ -76,11 +72,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> /////////////// State ////////////// - // stores the keys we've already seen to trigger execution upon receiving a watermark - // this can be problematic, since it is never cleared - // TODO: fix once the state refactoring is completed - private transient Set<KEY> keys; - private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState"; private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName"; @@ -109,25 +100,31 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> } @Override - @SuppressWarnings("unchecked") - public void open() throws Exception { - super.open(); + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); - if (keys == null) { - keys = new HashSet<>(); - } + // we have to call initializeState here and in the migration restore() + // method because the restore() (from legacy) is called before the + // initializeState(). + + initializeState(); + } + + private void initializeState() { if (nfaOperatorState == null) { - nfaOperatorState = getPartitionedState( - new ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>())); + nfaOperatorState = getRuntimeContext().getState( + new ValueStateDescriptor<>( + NFA_OPERATOR_STATE_NAME, + new NFA.Serializer<IN>())); } @SuppressWarnings("unchecked,rawtypes") TypeSerializer<StreamRecord<IN>> streamRecordSerializer = - (TypeSerializer) new StreamElementSerializer<>(getInputSerializer()); + (TypeSerializer) new StreamElementSerializer<>(inputSerializer); if (priorityQueueOperatorState == null) { - priorityQueueOperatorState = getPartitionedState( + priorityQueueOperatorState = getRuntimeContext().getState( new ValueStateDescriptor<>( PRIORITY_QUEUE_STATE_NAME, new PriorityQueueSerializer<>( @@ -139,6 +136,39 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> } } + @Override + public void open() throws Exception { + super.open(); + + InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService(); + + watermarkCallbackService.setWatermarkCallback( + new OnWatermarkCallback<KEY>() { + + @Override + public void onWatermark(KEY key, Watermark watermark) throws IOException { + setCurrentKey(key); + + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + NFA<IN> nfa = getNFA(); + + if (priorityQueue.isEmpty()) { + advanceTime(nfa, watermark.getTimestamp()); + } else { + while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) { + StreamRecord<IN> streamRecord = priorityQueue.poll(); + processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); + } + } + + updateNFA(nfa); + updatePriorityQueue(priorityQueue); + } + }, + keySerializer + ); + } + private NFA<IN> getNFA() throws IOException { NFA<IN> nfa = nfaOperatorState.value(); @@ -173,7 +203,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> @Override public void processElement(StreamRecord<IN> element) throws Exception { - keys.add(keySelector.getKey(element.getValue())); + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); if (isProcessingTime) { // there can be no out of order elements in processing time @@ -195,35 +225,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> } } - @Override - public void processWatermark(Watermark mark) throws Exception { - // we do our own watermark handling, no super call. we will never be able to use - // the timer service like this, however. - - // iterate over all keys to trigger the execution of the buffered elements - for (KEY key: keys) { - setCurrentKey(key); - - PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); - NFA<IN> nfa = getNFA(); - - if (priorityQueue.isEmpty()) { - advanceTime(nfa, mark.getTimestamp()); - } else { - while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { - StreamRecord<IN> streamRecord = priorityQueue.poll(); - - processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); - } - } - - updateNFA(nfa); - updatePriorityQueue(priorityQueue); - } - - output.emitWatermark(mark); - } - /** * Process the given event by giving it to the NFA and outputting the produced set of matched * event sequences. @@ -243,31 +244,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> */ protected abstract void advanceTime(NFA<IN> nfa, long timestamp); - @Override - public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - DataOutputView ov = new DataOutputViewStreamWrapper(out); - ov.writeInt(keys.size()); - - for (KEY key: keys) { - keySerializer.serialize(key, ov); - } - } - - @Override - public void restoreState(FSDataInputStream state) throws Exception { - DataInputView inputView = new DataInputViewStreamWrapper(state); - - if (keys == null) { - keys = new HashSet<>(); - } - - int numberEntries = inputView.readInt(); - - for (int i = 0; i <numberEntries; i++) { - keys.add(keySerializer.deserialize(inputView)); - } - } - ////////////////////// Utility Classes ////////////////////// /** http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index 36f2e7a..56ecb17 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -137,7 +137,6 @@ public class CEPOperatorUtils { } else { KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); - TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; patternStream = inputStream.keyBy(keySelector).transform( http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index f90b670..1899cb4 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -31,11 +31,11 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.types.Either; @@ -123,7 +123,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); // simulate snapshot/restore with some elements in internal sorting queue - StreamStateHandle snapshot = harness.snapshotLegacy(0, 0); + OperatorStateHandles snapshot = harness.snapshot(0, 0); harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( @@ -137,7 +137,7 @@ public class CEPOperatorTest extends TestLogger { BasicTypeInfo.INT_TYPE_INFO); harness.setup(); - harness.restore(snapshot); + harness.initializeState(snapshot); harness.open(); harness.processWatermark(new Watermark(Long.MIN_VALUE)); @@ -148,8 +148,12 @@ public class CEPOperatorTest extends TestLogger { // a pruning time underflow exception in NFA harness.processWatermark(new Watermark(2)); + harness.processElement(new StreamRecord<Event>(middleEvent, 3)); + harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord<Event>(endEvent, 5)); + // simulate snapshot/restore with empty element queue but NFA state - StreamStateHandle snapshot2 = harness.snapshotLegacy(1, 1); + OperatorStateHandles snapshot2 = harness.snapshot(1, 1); harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( @@ -163,13 +167,9 @@ public class CEPOperatorTest extends TestLogger { BasicTypeInfo.INT_TYPE_INFO); harness.setup(); - harness.restore(snapshot2); + harness.initializeState(snapshot2); harness.open(); - harness.processElement(new StreamRecord<Event>(middleEvent, 3)); - harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord<Event>(endEvent, 5)); - harness.processWatermark(new Watermark(Long.MAX_VALUE)); ConcurrentLinkedQueue<Object> result = harness.getOutput(); @@ -230,7 +230,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); // simulate snapshot/restore with some elements in internal sorting queue - StreamStateHandle snapshot = harness.snapshotLegacy(0, 0); + OperatorStateHandles snapshot = harness.snapshot(0, 0); harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( @@ -248,7 +248,7 @@ public class CEPOperatorTest extends TestLogger { harness.setStateBackend(rocksDBStateBackend); harness.setup(); - harness.restore(snapshot); + harness.initializeState(snapshot); harness.open(); harness.processWatermark(new Watermark(Long.MIN_VALUE)); @@ -260,7 +260,7 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(new Watermark(2)); // simulate snapshot/restore with empty element queue but NFA state - StreamStateHandle snapshot2 = harness.snapshotLegacy(1, 1); + OperatorStateHandles snapshot2 = harness.snapshot(1, 1); harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( @@ -277,7 +277,7 @@ public class CEPOperatorTest extends TestLogger { rocksDBStateBackend.setDbStoragePath(rocksDbPath); harness.setStateBackend(rocksDBStateBackend); harness.setup(); - harness.restore(snapshot2); + harness.initializeState(snapshot2); harness.open(); harness.processElement(new StreamRecord<Event>(middleEvent, 3)); http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java new file mode 100644 index 0000000..78765c0 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.util.Map; +import java.util.Queue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CEPRescalingTest { + + @Test + public void testCEPFunctionScalingUp() throws Exception { + int maxParallelism = 10; + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + // valid pattern events belong to different keygroups + // that will be shipped to different tasks when changing parallelism. + + Event startEvent1 = new Event(7, "start", 1.0); + SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0); + Event endEvent1= new Event(7, "end", 1.0); + + int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent1), maxParallelism); + assertEquals(1, keygroup); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup)); + + Event startEvent2 = new Event(10, "start", 1.0); // this will go to task index 2 + SubEvent middleEvent2 = new SubEvent(10, "foo", 1.0, 10.0); + Event endEvent2 = new Event(10, "end", 1.0); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent2), maxParallelism); + assertEquals(9, keygroup); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup)); + + // now we start the test, we go from parallelism 1 to 2. + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + getTestHarness(maxParallelism, 1, 0); + harness.open(); + + harness.processElement(new StreamRecord<>(startEvent1, 1)); // valid element + harness.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2)); + + harness.processElement(new StreamRecord<>(startEvent2, 3)); // valid element + harness.processElement(new StreamRecord<Event>(middleEvent2, 4)); // valid element + + // take a snapshot with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(0, 0); + harness.close(); + + // initialize two sub-tasks with the previously snapshotted state to simulate scaling up + + // we know that the valid element will go to index 0, + // so we initialize the two tasks and we put the rest of + // the valid elements for the pattern on task 0. + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness1 = + getTestHarness(maxParallelism, 2, 0); + + harness1.setup(); + harness1.initializeState(snapshot); + harness1.open(); + + // if element timestamps are not correctly checkpointed/restored this will lead to + // a pruning time underflow exception in NFA + harness1.processWatermark(new Watermark(2)); + + harness1.processElement(new StreamRecord<Event>(middleEvent1, 3)); // valid element + harness1.processElement(new StreamRecord<>(endEvent1, 5)); // valid element + + harness1.processWatermark(new Watermark(Long.MAX_VALUE)); + + // watermarks and the result + assertEquals(3, harness1.getOutput().size()); + verifyWatermark(harness1.getOutput().poll(), 2); + verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness2 = + getTestHarness(maxParallelism, 2, 1); + + harness2.setup(); + harness2.initializeState(snapshot); + harness2.open(); + + // now we move to the second parallel task + harness2.processWatermark(new Watermark(2)); + + harness2.processElement(new StreamRecord<>(endEvent2, 5)); + harness2.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); + + harness2.processWatermark(new Watermark(Long.MAX_VALUE)); + + assertEquals(3, harness2.getOutput().size()); + verifyWatermark(harness2.getOutput().poll(), 2); + verifyPattern(harness2.getOutput().poll(), startEvent2, middleEvent2, endEvent2); + + harness.close(); + harness1.close(); + harness2.close(); + } + + @Test + public void testCEPFunctionScalingDown() throws Exception { + int maxParallelism = 10; + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + // create some valid pattern events on predetermined key groups and task indices + + Event startEvent1 = new Event(7, "start", 1.0); // this will go to task index 0 + SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0); + Event endEvent1 = new Event(7, "end", 1.0); + + // verification of the key choice + int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent1), maxParallelism); + assertEquals(1, keygroup); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 3, keygroup)); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup)); + + Event startEvent2 = new Event(45, "start", 1.0); // this will go to task index 1 + SubEvent middleEvent2 = new SubEvent(45, "foo", 1.0, 10.0); + Event endEvent2 = new Event(45, "end", 1.0); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent2), maxParallelism); + assertEquals(6, keygroup); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 3, keygroup)); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup)); + + Event startEvent3 = new Event(90, "start", 1.0); // this will go to task index 0 + SubEvent middleEvent3 = new SubEvent(90, "foo", 1.0, 10.0); + Event endEvent3 = new Event(90, "end", 1.0); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent3), maxParallelism); + assertEquals(2, keygroup); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 3, keygroup)); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup)); + + Event startEvent4 = new Event(10, "start", 1.0); // this will go to task index 2 + SubEvent middleEvent4 = new SubEvent(10, "foo", 1.0, 10.0); + Event endEvent4 = new Event(10, "end", 1.0); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent4), maxParallelism); + assertEquals(9, keygroup); + assertEquals(2, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 3, keygroup)); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup)); + + // starting the test, we will go from parallelism of 3 to parallelism of 2 + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness1 = + getTestHarness(maxParallelism, 3, 0); + harness1.open(); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness2 = + getTestHarness(maxParallelism, 3, 1); + harness2.open(); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness3 = + getTestHarness(maxParallelism, 3, 2); + harness3.open(); + + harness1.processWatermark(Long.MIN_VALUE); + harness2.processWatermark(Long.MIN_VALUE); + harness3.processWatermark(Long.MIN_VALUE); + + harness1.processElement(new StreamRecord<>(startEvent1, 1)); // valid element + harness1.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2)); + harness1.processElement(new StreamRecord<Event>(middleEvent1, 3)); // valid element + harness1.processElement(new StreamRecord<>(endEvent1, 5)); // valid element + + // till here we have a valid sequence, so after creating the + // new instance and sending it a watermark, we expect it to fire, + // even with no new elements. + + harness1.processElement(new StreamRecord<>(startEvent3, 10)); + harness1.processElement(new StreamRecord<>(startEvent1, 10)); + + harness2.processElement(new StreamRecord<>(startEvent2, 7)); + harness2.processElement(new StreamRecord<Event>(middleEvent2, 8)); + + harness3.processElement(new StreamRecord<>(startEvent4, 15)); + harness3.processElement(new StreamRecord<Event>(middleEvent4, 16)); + harness3.processElement(new StreamRecord<>(endEvent4, 17)); + + // so far we only have the initial watermark + assertEquals(1, harness1.getOutput().size()); + verifyWatermark(harness1.getOutput().poll(), Long.MIN_VALUE); + + assertEquals(1, harness2.getOutput().size()); + verifyWatermark(harness2.getOutput().poll(), Long.MIN_VALUE); + + assertEquals(1, harness3.getOutput().size()); + verifyWatermark(harness3.getOutput().poll(), Long.MIN_VALUE); + + // we take a snapshot and make it look as a single operator + // this will be the initial state of all downstream tasks. + OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState( + harness2.snapshot(0, 0), + harness1.snapshot(0, 0), + harness3.snapshot(0, 0) + ); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness4 = + getTestHarness(maxParallelism, 2, 0); + harness4.setup(); + harness4.initializeState(snapshot); + harness4.open(); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness5 = + getTestHarness(maxParallelism, 2, 1); + harness5.setup(); + harness5.initializeState(snapshot); + harness5.open(); + + harness5.processElement(new StreamRecord<>(endEvent2, 11)); + harness5.processWatermark(new Watermark(12)); + + verifyPattern(harness5.getOutput().poll(), startEvent2, middleEvent2, endEvent2); + verifyWatermark(harness5.getOutput().poll(), 12); + + // if element timestamps are not correctly checkpointed/restored this will lead to + // a pruning time underflow exception in NFA + harness4.processWatermark(new Watermark(12)); + + assertEquals(2, harness4.getOutput().size()); + verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + verifyWatermark(harness4.getOutput().poll(), 12); + + harness4.processElement(new StreamRecord<Event>(middleEvent3, 15)); // valid element + harness4.processElement(new StreamRecord<>(endEvent3, 16)); // valid element + + harness4.processElement(new StreamRecord<Event>(middleEvent1, 15)); // valid element + harness4.processElement(new StreamRecord<>(endEvent1, 16)); // valid element + + harness4.processWatermark(new Watermark(Long.MAX_VALUE)); + harness5.processWatermark(new Watermark(Long.MAX_VALUE)); + + // verify result + assertEquals(3, harness4.getOutput().size()); + + // check the order of the events in the output + Queue<Object> output = harness4.getOutput(); + StreamRecord<?> resultRecord = (StreamRecord<?>) output.peek(); + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + if (patternMap.get("start").getId() == 7) { + verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3); + } else { + verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3); + verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + } + + // after scaling down this should end up here + assertEquals(2, harness5.getOutput().size()); + verifyPattern(harness5.getOutput().poll(), startEvent4, middleEvent4, endEvent4); + + harness1.close(); + harness2.close(); + harness3.close(); + harness4.close(); + harness5.close(); + } + + private void verifyWatermark(Object outputObject, long timestamp) { + assertTrue(outputObject instanceof Watermark); + assertEquals(timestamp, ((Watermark) outputObject).getTimestamp()); + } + + private void verifyPattern(Object outputObject, Event start, SubEvent middle, Event end) { + assertTrue(outputObject instanceof StreamRecord); + + StreamRecord<?> resultRecord = (StreamRecord<?>) outputObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + assertEquals(start, patternMap.get("start")); + assertEquals(middle, patternMap.get("middle")); + assertEquals(end, patternMap.get("end")); + } + + private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, Event>> getTestHarness( + int maxParallelism, + int taskParallelism, + int subtaskIdx) throws Exception { + + KeySelector<Event, Integer> keySelector = new TestKeySelector(); + return new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()), + new NFAFactory()), + keySelector, + BasicTypeInfo.INT_TYPE_INFO, + maxParallelism, + taskParallelism, + subtaskIdx); + } + + private static class NFAFactory implements NFACompiler.NFAFactory<Event> { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private NFAFactory() { + this(false); + } + + private NFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA<Event> createNFA() { + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }) + .followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }) + // add a window timeout to test whether timestamps of elements in the + // priority queue in CEP operator are correctly checkpointed/restored + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + /** + * A simple {@link KeySelector} that returns as key the id of the {@link Event} + * provided as argument in the {@link #getKey(Event)}. + * */ + private static class TestKeySelector implements KeySelector<Event, Integer> { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 05fda28..a81056f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -69,6 +69,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -82,8 +83,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * * <p>For concrete implementations, one of the following two interfaces must also be implemented, to * mark the operator as unary or binary: - * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or - * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator}. + * {@link OneInputStreamOperator} or {@link TwoInputStreamOperator}. * * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using * the timer service, timer callbacks are also guaranteed not to be called concurrently with @@ -93,7 +93,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ @PublicEvolving public abstract class AbstractStreamOperator<OUT> - implements StreamOperator<OUT>, java.io.Serializable, KeyContext { + implements StreamOperator<OUT>, Serializable, KeyContext { private static final long serialVersionUID = 1L; @@ -146,10 +146,9 @@ public abstract class AbstractStreamOperator<OUT> protected transient LatencyGauge latencyGauge; - // ---------------- timers ------------------ - - private transient Map<String, HeapInternalTimerService<?, ?>> timerServices; + // ---------------- time handler ------------------ + private transient InternalTimeServiceManager<?, ?> timeServiceManager; // ---------------- two-input operator watermarks ------------------ @@ -206,6 +205,14 @@ public abstract class AbstractStreamOperator<OUT> initKeyedState(); //TODO we should move the actual initialization of this from StreamTask to this class + if (getKeyedStateBackend() != null && timeServiceManager == null) { + timeServiceManager = new InternalTimeServiceManager<>( + getKeyedStateBackend().getNumberOfKeyGroups(), + getKeyedStateBackend().getKeyGroupRange(), + this, + getRuntimeContext().getProcessingTimeService()); + } + if (restoring) { restoreStreamCheckpointed(stateHandles); @@ -268,11 +275,7 @@ public abstract class AbstractStreamOperator<OUT> * @throws Exception An exception in this method causes the operator to fail. */ @Override - public void open() throws Exception { - if (timerServices == null) { - timerServices = new HashMap<>(); - } - } + public void open() throws Exception {} private void initKeyedState() { try { @@ -308,9 +311,9 @@ public abstract class AbstractStreamOperator<OUT> /** * This method is called after all records have been added to the operators via the methods - * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or - * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and - * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}. + * {@link OneInputStreamOperator#processElement(StreamRecord)}, or + * {@link TwoInputStreamOperator#processElement1(StreamRecord)} and + * {@link TwoInputStreamOperator#processElement2(StreamRecord)}. * <p>The method is expected to flush all remaining buffered data. Exceptions during this flushing * of buffered should be propagated, in order to cause the operation to be recognized asa failed, @@ -408,16 +411,8 @@ public abstract class AbstractStreamOperator<OUT> for (int keyGroupIdx : allKeyGroups) { out.startNewKeyGroup(keyGroupIdx); - DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); - dov.writeInt(timerServices.size()); - - for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) { - String serviceName = entry.getKey(); - HeapInternalTimerService<?, ?> timerService = entry.getValue(); - - dov.writeUTF(serviceName); - timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx); - } + timeServiceManager.snapshotStateForKeyGroup( + new DataOutputViewStreamWrapper(out), keyGroupIdx); } } catch (Exception exception) { throw new Exception("Could not write timer service of " + getOperatorName() + @@ -465,35 +460,17 @@ public abstract class AbstractStreamOperator<OUT> */ public void initializeState(StateInitializationContext context) throws Exception { if (getKeyedStateBackend() != null) { - int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups(); KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange(); - // initialize the map with the timer services - this.timerServices = new HashMap<>(); - // and then initialize the timer services for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { - DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream()); - int keyGroupIdx = streamProvider.getKeyGroupId(); checkArgument(localKeyGroupRange.contains(keyGroupIdx), "Key Group " + keyGroupIdx + " does not belong to the local range."); - int noOfTimerServices = div.readInt(); - for (int i = 0; i < noOfTimerServices; i++) { - String serviceName = div.readUTF(); - - HeapInternalTimerService<?, ?> timerService = this.timerServices.get(serviceName); - if (timerService == null) { - timerService = new HeapInternalTimerService<>( - totalKeyGroups, - localKeyGroupRange, - this, - getRuntimeContext().getProcessingTimeService()); - this.timerServices.put(serviceName, timerService); - } - timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader()); - } + timeServiceManager.restoreStateForKeyGroup( + new DataInputViewStreamWrapper(streamProvider.getStream()), + keyGroupIdx, getUserCodeClassloader()); } } } @@ -888,6 +865,20 @@ public abstract class AbstractStreamOperator<OUT> // ------------------------------------------------------------------------ /** + * Returns an {@link InternalWatermarkCallbackService} which allows to register a + * {@link OnWatermarkCallback} and multiple keys, for which + * the callback will be invoked every time a new {@link Watermark} is received. + * <p> + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators. + */ + public <K> InternalWatermarkCallbackService<K> getInternalWatermarkCallbackService() { + checkTimerServiceInitialization(); + + InternalTimeServiceManager<K, ?> keyedTimeServiceHandler = (InternalTimeServiceManager<K, ?>) timeServiceManager; + return keyedTimeServiceHandler.getWatermarkCallbackService(); + } + + /** * Returns a {@link InternalTimerService} that can be used to query current processing time * and event time and to set timers. An operator can have several timer services, where * each has its own namespace serializer. Timer services are differentiated by the string @@ -908,38 +899,34 @@ public abstract class AbstractStreamOperator<OUT> * * @param <N> The type of the timer namespace. */ - public <N> InternalTimerService<N> getInternalTimerService( + public <K, N> InternalTimerService<N> getInternalTimerService( String name, TypeSerializer<N> namespaceSerializer, - Triggerable<?, N> triggerable) { - if (getKeyedStateBackend() == null) { - throw new UnsupportedOperationException("Timers can only be used on keyed operators."); - } + Triggerable<K, N> triggerable) { - @SuppressWarnings("unchecked") - HeapInternalTimerService<Object, N> timerService = (HeapInternalTimerService<Object, N>) timerServices.get(name); + checkTimerServiceInitialization(); - if (timerService == null) { - timerService = new HeapInternalTimerService<>( - getKeyedStateBackend().getNumberOfKeyGroups(), - getKeyedStateBackend().getKeyGroupRange(), - this, - getRuntimeContext().getProcessingTimeService()); - timerServices.put(name, timerService); - } - @SuppressWarnings({"unchecked", "rawtypes"}) - Triggerable rawTriggerable = (Triggerable) triggerable; - timerService.startTimerService(getKeyedStateBackend().getKeySerializer(), namespaceSerializer, rawTriggerable); - return timerService; + // the following casting is to overcome type restrictions. + TypeSerializer<K> keySerializer = (TypeSerializer<K>) getKeyedStateBackend().getKeySerializer(); + InternalTimeServiceManager<K, N> keyedTimeServiceHandler = (InternalTimeServiceManager<K, N>) timeServiceManager; + return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable); } public void processWatermark(Watermark mark) throws Exception { - for (HeapInternalTimerService<?, ?> service : timerServices.values()) { - service.advanceWatermark(mark.getTimestamp()); + if (timeServiceManager != null) { + timeServiceManager.advanceWatermark(mark); } output.emitWatermark(mark); } + private void checkTimerServiceInitialization() { + if (getKeyedStateBackend() == null) { + throw new UnsupportedOperationException("Timers can only be used on keyed operators."); + } else if (timeServiceManager == null) { + throw new RuntimeException("The timer service has not been initialized."); + } + } + public void processWatermark1(Watermark mark) throws Exception { input1Watermark = mark.getTimestamp(); long newMin = Math.min(input1Watermark, input2Watermark); @@ -960,19 +947,13 @@ public abstract class AbstractStreamOperator<OUT> @VisibleForTesting public int numProcessingTimeTimers() { - int count = 0; - for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) { - count += timerService.numProcessingTimeTimers(); - } - return count; + return timeServiceManager == null ? 0 : + timeServiceManager.numProcessingTimeTimers(); } @VisibleForTesting public int numEventTimeTimers() { - int count = 0; - for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) { - count += timerService.numEventTimeTimers(); - } - return count; + return timeServiceManager == null ? 0 : + timeServiceManager.numEventTimeTimers(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java new file mode 100644 index 0000000..71ffbd2 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * An entity keeping all the time-related services available to all operators extending the + * {@link AbstractStreamOperator}. These are the different {@link HeapInternalTimerService timer services} + * and the {@link InternalWatermarkCallbackService}. + * + * <b>NOTE:</b> These services are only available to keyed operators. + * + * @param <K> The type of keys used for the timers and the registry. + * @param <N> The type of namespace used for the timers. + */ +@Internal +class InternalTimeServiceManager<K, N> { + + private final int totalKeyGroups; + private final KeyGroupsList localKeyGroupRange; + private final KeyContext keyContext; + + private final ProcessingTimeService processingTimeService; + + private final Map<String, HeapInternalTimerService<K, N>> timerServices; + private final InternalWatermarkCallbackService<K> watermarkCallbackService; + + InternalTimeServiceManager( + int totalKeyGroups, + KeyGroupsList localKeyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + Preconditions.checkArgument(totalKeyGroups > 0); + this.totalKeyGroups = totalKeyGroups; + this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange); + + this.keyContext = Preconditions.checkNotNull(keyContext); + this.processingTimeService = Preconditions.checkNotNull(processingTimeService); + + this.timerServices = new HashMap<>(); + this.watermarkCallbackService = new InternalWatermarkCallbackService<>(totalKeyGroups, localKeyGroupRange, keyContext); + } + + /** + * Returns an {@link InternalWatermarkCallbackService} which allows to register a + * {@link OnWatermarkCallback} and multiple keys, for which + * the callback will be invoked every time a new {@link Watermark} is received. + */ + public InternalWatermarkCallbackService<K> getWatermarkCallbackService() { + return watermarkCallbackService; + } + + /** + * Returns a {@link InternalTimerService} that can be used to query current processing time + * and event time and to set timers. An operator can have several timer services, where + * each has its own namespace serializer. Timer services are differentiated by the string + * key that is given when requesting them, if you call this method with the same key + * multiple times you will get the same timer service instance in subsequent requests. + * + * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation. + * When a timer fires, this key will also be set as the currently active key. + * + * <p>Each timer has attached metadata, the namespace. Different timer services + * can have a different namespace type. If you don't need namespace differentiation you + * can use {@link VoidNamespaceSerializer} as the namespace serializer. + * + * @param name The name of the requested timer service. If no service exists under the given + * name a new one will be created and returned. + * @param keySerializer {@code TypeSerializer} for the timer keys. + * @param namespaceSerializer {@code TypeSerializer} for the timer namespace. + * @param triggerable The {@link Triggerable} that should be invoked when timers fire + */ + public InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) { + + HeapInternalTimerService<K, N> timerService = timerServices.get(name); + if (timerService == null) { + timerService = new HeapInternalTimerService<>(totalKeyGroups, + localKeyGroupRange, keyContext, processingTimeService); + timerServices.put(name, timerService); + } + timerService.startTimerService(keySerializer, namespaceSerializer, triggerable); + return timerService; + } + + public void advanceWatermark(Watermark watermark) throws Exception { + for (HeapInternalTimerService<?, ?> service : timerServices.values()) { + service.advanceWatermark(watermark.getTimestamp()); + } + watermarkCallbackService.invokeOnWatermarkCallback(watermark); + } + + ////////////////// Fault Tolerance Methods /////////////////// + + public void snapshotStateForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception { + stream.writeInt(timerServices.size()); + + for (Map.Entry<String, HeapInternalTimerService<K, N>> entry : timerServices.entrySet()) { + String serviceName = entry.getKey(); + HeapInternalTimerService<?, ?> timerService = entry.getValue(); + + stream.writeUTF(serviceName); + timerService.snapshotTimersForKeyGroup(stream, keyGroupIdx); + } + + // write a byte indicating if there was a key + // registry service instantiated (1) or not (0). + if (watermarkCallbackService != null) { + stream.writeByte(1); + watermarkCallbackService.snapshotKeysForKeyGroup(stream, keyGroupIdx); + } else { + stream.writeByte(0); + } + } + + public void restoreStateForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx, + ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { + + int noOfTimerServices = stream.readInt(); + for (int i = 0; i < noOfTimerServices; i++) { + String serviceName = stream.readUTF(); + + HeapInternalTimerService<K, N> timerService = timerServices.get(serviceName); + if (timerService == null) { + timerService = new HeapInternalTimerService<>( + totalKeyGroups, + localKeyGroupRange, + keyContext, + processingTimeService); + timerServices.put(serviceName, timerService); + } + timerService.restoreTimersForKeyGroup(stream, keyGroupIdx, userCodeClassLoader); + } + + byte hadKeyRegistry = stream.readByte(); + if (hadKeyRegistry == 1) { + watermarkCallbackService.restoreKeysForKeyGroup(stream, keyGroupIdx, userCodeClassLoader); + } + } + + //////////////////// Methods used ONLY IN TESTS //////////////////// + + @VisibleForTesting + public int numProcessingTimeTimers() { + int count = 0; + for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) { + count += timerService.numProcessingTimeTimers(); + } + return count; + } + + @VisibleForTesting + public int numEventTimeTimers() { + int count = 0; + for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) { + count += timerService.numEventTimeTimers(); + } + return count; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java new file mode 100644 index 0000000..a4263e4 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The watermark callback service allows to register a {@link OnWatermarkCallback OnWatermarkCallback} + * and multiple keys, for which the callback will be invoked every time a new {@link Watermark} is received + * (after the registration of the key). + * <p> + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators. + * + * @param <K> The type of key returned by the {@code KeySelector}. + */ +@Internal +public class InternalWatermarkCallbackService<K> { + + //////////// Information about the keyed state ////////// + + private final KeyGroupsList localKeyGroupRange; + private final int totalKeyGroups; + private final int localKeyGroupRangeStartIdx; + + private final KeyContext keyContext; + + /** + * An array of sets of keys keeping the registered keys split + * by the key-group they belong to. Each key-group has one set. + */ + private final Set<K>[] keysByKeygroup; + + /** A serializer for the registered keys. */ + private TypeSerializer<K> keySerializer; + + /** + * The {@link OnWatermarkCallback} to be invoked for each + * registered key upon reception of the watermark. + */ + private OnWatermarkCallback<K> callback; + + public InternalWatermarkCallbackService(int totalKeyGroups, KeyGroupsList localKeyGroupRange, KeyContext keyContext) { + + this.totalKeyGroups = totalKeyGroups; + this.localKeyGroupRange = checkNotNull(localKeyGroupRange); + this.keyContext = checkNotNull(keyContext); + + // find the starting index of the local key-group range + int startIdx = Integer.MAX_VALUE; + for (Integer keyGroupIdx : localKeyGroupRange) { + startIdx = Math.min(keyGroupIdx, startIdx); + } + this.localKeyGroupRangeStartIdx = startIdx; + + // the list of ids of the key-groups this task is responsible for + int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups(); + this.keysByKeygroup = new Set[localKeyGroups]; + } + + /** + * Registers a {@link OnWatermarkCallback} with the current {@link InternalWatermarkCallbackService} service. + * Before this method is called and the callback is set, the service is unusable. + * + * @param watermarkCallback The callback to be registered. + * @param keySerializer A serializer for the registered keys. + */ + public void setWatermarkCallback(OnWatermarkCallback<K> watermarkCallback, TypeSerializer<K> keySerializer) { + if (callback == null) { + this.keySerializer = keySerializer; + this.callback = watermarkCallback; + } else { + throw new RuntimeException("The watermark callback has already been initialized."); + } + } + + /** + * Registers a key with the service. This will lead to the {@link OnWatermarkCallback} + * being invoked for this key upon reception of each subsequent watermark. + * + * @param key The key to be registered. + */ + public boolean registerKeyForWatermarkCallback(K key) { + return getKeySetForKeyGroup(key).add(key); + } + + /** + * Unregisters the provided key from the service. + * + * @param key The key to be unregistered. + */ + public boolean unregisterKeyFromWatermarkCallback(K key) { + Set<K> keys = getKeySetForKeyGroup(key); + boolean res = keys.remove(key); + + if (keys.isEmpty()) { + removeKeySetForKey(key); + } + return res; + } + + /** + * Invokes the registered callback for all the registered keys. + * + * @param watermark The watermark that triggered the invocation. + */ + public void invokeOnWatermarkCallback(Watermark watermark) throws IOException { + if (callback != null) { + for (Set<K> keySet : keysByKeygroup) { + if (keySet != null) { + for (K key : keySet) { + keyContext.setCurrentKey(key); + callback.onWatermark(key, watermark); + } + } + } + } + } + + /** + * Retrieve the set of keys for the key-group this key belongs to. + * + * @param key the key whose key-group we are searching. + * @return the set of registered keys for the key-group. + */ + private Set<K> getKeySetForKeyGroup(K key) { + checkArgument(localKeyGroupRange != null, "The operator has not been initialized."); + int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups); + return getKeySetForKeyGroup(keyGroupIdx); + } + + /** + * Retrieve the set of keys for the requested key-group. + * + * @param keyGroupIdx the index of the key group we are interested in. + * @return the set of keys for the key-group. + */ + private Set<K> getKeySetForKeyGroup(int keyGroupIdx) { + int localIdx = getIndexForKeyGroup(keyGroupIdx); + Set<K> keys = keysByKeygroup[localIdx]; + if (keys == null) { + keys = new HashSet<>(); + keysByKeygroup[localIdx] = keys; + } + return keys; + } + + private void removeKeySetForKey(K key) { + checkArgument(localKeyGroupRange != null, "The operator has not been initialized."); + int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups); + int localKeyGroupIdx = getIndexForKeyGroup(keyGroupIdx); + keysByKeygroup[localKeyGroupIdx] = null; + } + + /** + * Computes the index of the requested key-group in the local datastructures. + * <li/> + * Currently we assume that each task is assigned a continuous range of key-groups, + * e.g. 1,2,3,4, and not 1,3,5. We leverage this to keep the different states + * key-grouped in arrays instead of maps, where the offset for each key-group is + * the key-group id (an int) minus the id of the first key-group in the local range. + * This is for performance reasons. + */ + private int getIndexForKeyGroup(int keyGroupIdx) { + checkArgument(localKeyGroupRange.contains(keyGroupIdx), + "Key Group " + keyGroupIdx + " does not belong to the local range."); + return keyGroupIdx - localKeyGroupRangeStartIdx; + } + + ////////////////// Fault Tolerance Methods /////////////////// + + public void snapshotKeysForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception { + Set<K> keySet = getKeySetForKeyGroup(keyGroupIdx); + if (keySet != null) { + stream.writeInt(keySet.size()); + + InstantiationUtil.serializeObject(stream, keySerializer); + for (K key : keySet) { + keySerializer.serialize(key, stream); + } + } else { + stream.writeInt(0); + } + } + + public void restoreKeysForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx, + ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { + + checkArgument(localKeyGroupRange.contains(keyGroupIdx), + "Key Group " + keyGroupIdx + " does not belong to the local range."); + + int numKeys = stream.readInt(); + if (numKeys > 0) { + + TypeSerializer<K> tmpKeyDeserializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader); + + if (keySerializer != null && !keySerializer.equals(tmpKeyDeserializer)) { + throw new IllegalArgumentException("Tried to restore timers " + + "for the same service with different serializers."); + } + + this.keySerializer = tmpKeyDeserializer; + + Set<K> keys = getKeySetForKeyGroup(keyGroupIdx); + for (int i = 0; i < numKeys; i++) { + keys.add(keySerializer.deserialize(stream)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java new file mode 100644 index 0000000..bc317a9 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.watermark.Watermark; + +import java.io.IOException; + +/** + * A callback registered with the {@link InternalWatermarkCallbackService} service. This callback will + * be invoked for all keys registered with the service, upon reception of a watermark. + */ +@Internal +public interface OnWatermarkCallback<KEY> { + + /** + * The action to be triggered upon reception of a watermark. + * + * @param key The current key. + * @param watermark The current watermark. + */ + void onWatermark(KEY key, Watermark watermark) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/daf0ccda/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 8507200..33def9e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -34,12 +34,14 @@ import static org.powermock.api.mockito.PowerMockito.spy; import static org.powermock.api.mockito.PowerMockito.when; import static org.powermock.api.mockito.PowerMockito.whenNew; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.RunnableFuture; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -55,12 +57,14 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.internal.util.reflection.Whitebox; @@ -610,6 +614,208 @@ public class AbstractStreamOperatorTest { verify(futureKeyGroupStateHandle).cancel(anyBoolean()); } + @Test + public void testWatermarkCallbackServiceScalingUp() throws Exception { + final int MAX_PARALLELISM = 10; + + KeySelector<Tuple2<Integer, String>, Integer> keySelector = new TestKeySelector(); + + Tuple2<Integer, String> element1 = new Tuple2<>(7, "first"); + Tuple2<Integer, String> element2 = new Tuple2<>(10, "start"); + + int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(element1), MAX_PARALLELISM); + assertEquals(1, keygroup); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 2, keygroup)); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(element2), MAX_PARALLELISM); + assertEquals(9, keygroup); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 2, keygroup)); + + // now we start the test, we go from parallelism 1 to 2. + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness1 = + getTestHarness(MAX_PARALLELISM, 1, 0); + testHarness1.open(); + + testHarness1.processElement(new StreamRecord<>(element1)); + testHarness1.processElement(new StreamRecord<>(element2)); + + assertEquals(0, testHarness1.getOutput().size()); + + // take a snapshot with some elements in internal sorting queue + OperatorStateHandles snapshot = testHarness1.snapshot(0, 0); + testHarness1.close(); + + // initialize two sub-tasks with the previously snapshotted state to simulate scaling up + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness2 = + getTestHarness(MAX_PARALLELISM, 2, 0); + + testHarness2.setup(); + testHarness2.initializeState(snapshot); + testHarness2.open(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness3 = + getTestHarness(MAX_PARALLELISM, 2, 1); + + testHarness3.setup(); + testHarness3.initializeState(snapshot); + testHarness3.open(); + + testHarness2.processWatermark(new Watermark(10)); + testHarness3.processWatermark(new Watermark(10)); + + assertEquals(2, testHarness2.getOutput().size()); + verifyElement(testHarness2.getOutput().poll(), 7); + verifyWatermark(testHarness2.getOutput().poll(), 10); + + assertEquals(2, testHarness3.getOutput().size()); + verifyElement(testHarness3.getOutput().poll(), 10); + verifyWatermark(testHarness3.getOutput().poll(), 10); + + testHarness1.close(); + testHarness2.close(); + testHarness3.close(); + } + + @Test + public void testWatermarkCallbackServiceScalingDown() throws Exception { + final int MAX_PARALLELISM = 10; + + KeySelector<Tuple2<Integer, String>, Integer> keySelector = new TestKeySelector(); + + Tuple2<Integer, String> element1 = new Tuple2<>(7, "first"); + Tuple2<Integer, String> element2 = new Tuple2<>(45, "start"); + Tuple2<Integer, String> element3 = new Tuple2<>(90, "start"); + Tuple2<Integer, String> element4 = new Tuple2<>(10, "start"); + + int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(element1), MAX_PARALLELISM); + assertEquals(1, keygroup); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 3, keygroup)); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 2, keygroup)); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(element2), MAX_PARALLELISM); + assertEquals(6, keygroup); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 3, keygroup)); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 2, keygroup)); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(element3), MAX_PARALLELISM); + assertEquals(2, keygroup); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 3, keygroup)); + assertEquals(0, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 2, keygroup)); + + keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(element4), MAX_PARALLELISM); + assertEquals(9, keygroup); + assertEquals(2, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 3, keygroup)); + assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(MAX_PARALLELISM, 2, keygroup)); + + // starting the test, we will go from parallelism of 3 to parallelism of 2 + + // first operator + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness1 = + getTestHarness(MAX_PARALLELISM, 3, 0); + testHarness1.open(); + + // second operator + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness2 = + getTestHarness(MAX_PARALLELISM, 3, 1); + testHarness2.open(); + + // third operator + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness3 = + getTestHarness(MAX_PARALLELISM, 3, 2); + testHarness3.open(); + + testHarness1.processWatermark(Long.MIN_VALUE); + testHarness2.processWatermark(Long.MIN_VALUE); + testHarness3.processWatermark(Long.MIN_VALUE); + + testHarness1.processElement(new StreamRecord<>(element1)); + testHarness1.processElement(new StreamRecord<>(element3)); + + testHarness2.processElement(new StreamRecord<>(element2)); + testHarness3.processElement(new StreamRecord<>(element4)); + + // so far we only have the initial watermark + assertEquals(1, testHarness1.getOutput().size()); + verifyWatermark(testHarness1.getOutput().poll(), Long.MIN_VALUE); + + assertEquals(1, testHarness2.getOutput().size()); + verifyWatermark(testHarness2.getOutput().poll(), Long.MIN_VALUE); + + assertEquals(1, testHarness3.getOutput().size()); + verifyWatermark(testHarness3.getOutput().poll(), Long.MIN_VALUE); + + // we take a snapshot and make it look as a single operator + // this will be the initial state of all downstream tasks. + OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness3.snapshot(0, 0) + ); + + // first new operator + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness4 = + getTestHarness(MAX_PARALLELISM, 2, 0); + testHarness4.setup(); + testHarness4.initializeState(snapshot); + testHarness4.open(); + + // second new operator + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness5 = + getTestHarness(MAX_PARALLELISM, 2, 1); + testHarness5.setup(); + testHarness5.initializeState(snapshot); + testHarness5.open(); + + testHarness4.processWatermark(10); + testHarness5.processWatermark(10); + + assertEquals(3, testHarness4.getOutput().size()); + verifyElement(testHarness4.getOutput().poll(), 7); + verifyElement(testHarness4.getOutput().poll(), 90); + verifyWatermark(testHarness4.getOutput().poll(), 10); + + assertEquals(3, testHarness5.getOutput().size()); + verifyElement(testHarness5.getOutput().poll(), 45); + verifyElement(testHarness5.getOutput().poll(), 10); + verifyWatermark(testHarness5.getOutput().poll(), 10); + + testHarness1.close(); + testHarness2.close(); + testHarness3.close(); + testHarness4.close(); + testHarness5.close(); + } + + private KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> getTestHarness( + int maxParallelism, int noOfTasks, int taskIdx) throws Exception { + + return new KeyedOneInputStreamOperatorTestHarness<>( + new TestOperatorWithCallback(), + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + maxParallelism, + noOfTasks, /* num subtasks */ + taskIdx /* subtask index */); + } + + private void verifyWatermark(Object outputObject, long timestamp) { + Assert.assertTrue(outputObject instanceof Watermark); + assertEquals(timestamp, ((Watermark) outputObject).getTimestamp()); + } + + private void verifyElement(Object outputObject, int expected) { + Assert.assertTrue(outputObject instanceof StreamRecord); + + StreamRecord<?> resultRecord = (StreamRecord<?>) outputObject; + Assert.assertTrue(resultRecord.getValue() instanceof Integer); + + @SuppressWarnings("unchecked") + int actual = (Integer) resultRecord.getValue(); + assertEquals(expected, actual); + } + /** * Extracts the result values form the test harness and clear the output queue. */ @@ -626,7 +832,6 @@ public class AbstractStreamOperatorTest { return result; } - private static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> { private static final long serialVersionUID = 1L; @@ -636,6 +841,33 @@ public class AbstractStreamOperatorTest { } } + private static class TestOperatorWithCallback + extends AbstractStreamOperator<Integer> + implements OneInputStreamOperator<Tuple2<Integer, String>, Integer> { + + private static final long serialVersionUID = 9215057823264582305L; + + @Override + public void open() throws Exception { + super.open(); + + InternalWatermarkCallbackService<Integer> callbackService = getInternalWatermarkCallbackService(); + + callbackService.setWatermarkCallback(new OnWatermarkCallback<Integer>() { + + @Override + public void onWatermark(Integer integer, Watermark watermark) throws IOException { + output.collect(new StreamRecord<>(integer)); + } + }, IntSerializer.INSTANCE); + } + + @Override + public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception { + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(element.getValue().f0); + } + } + /** * Testing operator that can respond to commands by either setting/deleting state, emitting * state or setting timers.
