Repository: flink Updated Branches: refs/heads/release-1.1 5731672e5 -> 05a5f460b
[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger Backported to release-1.1. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05a5f460 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05a5f460 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05a5f460 Branch: refs/heads/release-1.1 Commit: 05a5f460b33828cc8a1e6a45d37b555facc7133f Parents: 5731672 Author: manuzhang <[email protected]> Authored: Thu Oct 20 15:06:01 2016 +0800 Committer: Maximilian Michels <[email protected]> Committed: Mon Oct 24 15:54:16 2016 +0200 ---------------------------------------------------------------------- .../triggers/ContinuousEventTimeTrigger.java | 14 ++-- .../operators/windowing/WindowOperatorTest.java | 76 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05a5f460/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 02613f6..cb8cdf5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -89,9 +89,11 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } } @Override @@ -100,8 +102,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) { + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + } return TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/05a5f460/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 62266c4..dfa353c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -595,6 +596,81 @@ public class WindowOperatorTest { testHarness.close(); } + /** + * This tests whether merging works correctly with the ContinuousEventTimeTrigger. + * @throws Exception + */ + @Test + @SuppressWarnings("unchecked") + public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception { + closeCalled.set(0); + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + ContinuousEventTimeTrigger.of(Time.seconds(2)), + 0); + + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + // add elements out-of-order and first trigger time is 2000 + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 1500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + + // triggers emit and next trigger time is 4000 + testHarness.processWatermark(new Watermark(2500)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 1500L, 4500L), 4499)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); + expectedOutput.add(new Watermark(2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 4000)); + testHarness.processWatermark(new Watermark(3000)); + expectedOutput.add(new Watermark(3000)); + + // do a snapshot, close and restore again + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.close(); + testHarness.setup(); + testHarness.restore(snapshot, 0); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 4000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500)); + // triggers emit and next trigger time is 6000 + testHarness.processWatermark(new Watermark(4000)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 1500L, 7000L), 6999)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 0L, 7000L), 6999)); + expectedOutput.add(new Watermark(4000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + @Test public void testMergeAndEvictor() throws Exception { // verify that merging WindowAssigner and Evictor cannot be used together
