Repository: flink Updated Branches: refs/heads/release-1.5 f84a1644f -> a809a2650
[FLINK-9201] Add trigger tests for late-window merging This closes #5917 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a809a265 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a809a265 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a809a265 Branch: refs/heads/release-1.5 Commit: a809a2650b44c00e4c25058fce439b1051033cf3 Parents: 038eb1d Author: Aljoscha Krettek <[email protected]> Authored: Mon May 14 10:08:11 2018 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon May 14 10:10:42 2018 +0200 ---------------------------------------------------------------------- .../windowing/EventTimeTriggerTest.java | 39 ++++++++++++++++++++ .../windowing/ProcessingTimeTriggerTest.java | 38 +++++++++++++++++++ 2 files changed, 77 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a809a265/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java index f54367b..eb14561 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java @@ -153,4 +153,43 @@ public class EventTimeTriggerTest { assertEquals(0, testHarness.numProcessingTimeTimers()); assertEquals(0, testHarness.numEventTimeTimers()); } + + /** + * Merging a late window should not register a timer, otherwise we would get two firings: + * one from onElement() on the merged window and one from the timer. + */ + @Test + public void testMergingLateWindows() throws Exception { + + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + assertTrue(EventTimeTrigger.create().canMerge()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.advanceWatermark(10); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 4))); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a809a265/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java index 7e78854..7336c88 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java @@ -134,4 +134,42 @@ public class ProcessingTimeTriggerTest { assertEquals(0, testHarness.numProcessingTimeTimers()); assertEquals(0, testHarness.numEventTimeTimers()); } + + /** + * Merging a late window should not register a timer, otherwise we would get two firings: + * one from onElement() on the merged window and one from the timer. + */ + @Test + public void testMergingLateWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer()); + + assertTrue(ProcessingTimeTrigger.create().canMerge()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + testHarness.advanceProcessingTime(10); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 4))); + } }
