Repository: flink Updated Branches: refs/heads/master e33243729 -> 45762162f
[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger This closes #2671 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45762162 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45762162 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45762162 Branch: refs/heads/master Commit: 45762162fe7c23fd921db1e0f826b2906bfe1dcd Parents: e332437 Author: manuzhang <[email protected]> Authored: Thu Oct 20 15:06:01 2016 +0800 Committer: Maximilian Michels <[email protected]> Committed: Mon Oct 24 15:49:18 2016 +0200 ---------------------------------------------------------------------- .../triggers/ContinuousEventTimeTrigger.java | 14 ++-- .../operators/windowing/WindowOperatorTest.java | 71 ++++++++++++++++++++ 2 files changed, 81 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/45762162/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 46080ff..fa5bb2f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -88,9 +88,11 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } } @Override @@ -99,8 +101,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) { + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + } return TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/45762162/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index b8a764e..9e50aaa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -569,6 +569,77 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } + /** + * This tests whether merging works correctly with the ContinuousEventTimeTrigger. + * @throws Exception + */ + @Test + @SuppressWarnings("unchecked") + public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception { + closeCalled.set(0); + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + ContinuousEventTimeTrigger.of(Time.seconds(2)), + 0); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + // add elements out-of-order and first trigger time is 2000 + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 1500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + + // triggers emit and next trigger time is 4000 + testHarness.processWatermark(new Watermark(2500)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-1", 1500L, 4500L), 4499)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); + expectedOutput.add(new Watermark(2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 4000)); + testHarness.processWatermark(new Watermark(3000)); + expectedOutput.add(new Watermark(3000)); + + // do a snapshot, close and restore again + StreamStateHandle snapshot = testHarness.snapshotLegacy(0L, 0L); + testHarness.close(); + testHarness.setup(); + testHarness.restore(snapshot); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 4000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500)); + // triggers emit and next trigger time is 6000 + testHarness.processWatermark(new Watermark(4000)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 1500L, 7000L), 6999)); + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-15", 0L, 7000L), 6999)); + expectedOutput.add(new Watermark(4000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + @Test public void testMergeAndEvictor() throws Exception { // verify that merging WindowAssigner and Evictor cannot be used together
