[FLINK-9201] Add trigger tests for late-window merging This closes #5917
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64f32f92 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64f32f92 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64f32f92 Branch: refs/heads/master Commit: 64f32f9299bfdb7e7b1fcd759618c37638a6a5b7 Parents: 3afb7b3 Author: Aljoscha Krettek <[email protected]> Authored: Mon May 14 10:08:11 2018 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon May 14 10:09:56 2018 +0200 ---------------------------------------------------------------------- .../windowing/EventTimeTriggerTest.java | 39 ++++++++++++++++++++ .../windowing/ProcessingTimeTriggerTest.java | 38 +++++++++++++++++++ 2 files changed, 77 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/64f32f92/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java index f54367b..eb14561 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java @@ -153,4 +153,43 @@ public class EventTimeTriggerTest { assertEquals(0, testHarness.numProcessingTimeTimers()); assertEquals(0, testHarness.numEventTimeTimers()); } + + /** + * Merging a late window should not register a timer, otherwise we would get two firings: + * one from onElement() on the merged window and one from the timer. + */ + @Test + public void testMergingLateWindows() throws Exception { + + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + assertTrue(EventTimeTrigger.create().canMerge()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.advanceWatermark(10); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 4))); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/64f32f92/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java index 7e78854..7336c88 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java @@ -134,4 +134,42 @@ public class ProcessingTimeTriggerTest { assertEquals(0, testHarness.numProcessingTimeTimers()); assertEquals(0, testHarness.numEventTimeTimers()); } + + /** + * Merging a late window should not register a timer, otherwise we would get two firings: + * one from onElement() on the merged window and one from the timer. + */ + @Test + public void testMergingLateWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer()); + + assertTrue(ProcessingTimeTrigger.create().canMerge()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + testHarness.advanceProcessingTime(10); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 4))); + } }
