Repository: flink
Updated Branches:
  refs/heads/release-1.5 f84a1644f -> a809a2650


[FLINK-9201] Add trigger tests for late-window merging

This closes #5917


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a809a265
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a809a265
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a809a265

Branch: refs/heads/release-1.5
Commit: a809a2650b44c00e4c25058fce439b1051033cf3
Parents: 038eb1d
Author: Aljoscha Krettek <[email protected]>
Authored: Mon May 14 10:08:11 2018 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon May 14 10:10:42 2018 +0200

----------------------------------------------------------------------
 .../windowing/EventTimeTriggerTest.java         | 39 ++++++++++++++++++++
 .../windowing/ProcessingTimeTriggerTest.java    | 38 +++++++++++++++++++
 2 files changed, 77 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a809a265/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
index f54367b..eb14561 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
@@ -153,4 +153,43 @@ public class EventTimeTriggerTest {
                assertEquals(0, testHarness.numProcessingTimeTimers());
                assertEquals(0, testHarness.numEventTimeTimers());
        }
+
+       /**
+        * Merging a late window should not register a timer, otherwise we 
would get two firings:
+        * one from onElement() on the merged window and one from the timer.
+        */
+       @Test
+       public void testMergingLateWindows() throws Exception {
+
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                       new TriggerTestHarness<>(EventTimeTrigger.create(), new 
TimeWindow.Serializer());
+
+               assertTrue(EventTimeTrigger.create().canMerge());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(2, testHarness.numEventTimeTimers());
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.advanceWatermark(10);
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 4), 
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+               assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 4)));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a809a265/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
index 7e78854..7336c88 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
@@ -134,4 +134,42 @@ public class ProcessingTimeTriggerTest {
                assertEquals(0, testHarness.numProcessingTimeTimers());
                assertEquals(0, testHarness.numEventTimeTimers());
        }
+
+       /**
+        * Merging a late window should not register a timer, otherwise we 
would get two firings:
+        * one from onElement() on the merged window and one from the timer.
+        */
+       @Test
+       public void testMergingLateWindows() throws Exception {
+               TriggerTestHarness<Object, TimeWindow> testHarness =
+                       new 
TriggerTestHarness<>(ProcessingTimeTrigger.create(), new 
TimeWindow.Serializer());
+
+               assertTrue(ProcessingTimeTrigger.create().canMerge());
+
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+               assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(2, testHarness.numProcessingTimeTimers());
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(1, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.advanceProcessingTime(10);
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+
+               testHarness.mergeWindows(new TimeWindow(0, 4), 
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+               assertEquals(0, testHarness.numStateEntries());
+               assertEquals(0, testHarness.numEventTimeTimers());
+               assertEquals(0, testHarness.numProcessingTimeTimers());
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 2)));
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(2, 4)));
+               assertEquals(0, testHarness.numProcessingTimeTimers(new 
TimeWindow(0, 4)));
+       }
 }

Reply via email to