[FLINK-5972] Don't allow shrinking merging windows

This closes #3587.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9988343
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9988343
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9988343

Branch: refs/heads/release-1.2
Commit: b99883430decef11f6893d9db8f77bc98458979b
Parents: 785ae63
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Mar 21 14:58:45 2017 +0100
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Mar 24 12:25:03 2017 +0800

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 13 ++++
 .../operators/windowing/WindowOperator.java     | 13 ++++
 .../windowing/WindowOperatorContractTest.java   | 80 ++++++++++++++++++++
 3 files changed, 106 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9988343/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 2d28c00..b0d36c2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -104,6 +104,19 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                                        public void merge(W 
mergeResult,
                                                                        
Collection<W> mergedWindows, W stateWindowResult,
                                                                        
Collection<W> mergedStateWindows) throws Exception {
+
+                                                               if 
((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness 
<= internalTimerService.currentWatermark())) {
+                                                                       throw 
new UnsupportedOperationException("The end timestamp of an " +
+                                                                               
        "event-time window cannot become earlier than the current watermark " +
+                                                                               
        "by merging. Current watermark: " + 
internalTimerService.currentWatermark() +
+                                                                               
        " window: " + mergeResult);
+                                                               } else if 
(!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= 
internalTimerService.currentProcessingTime()) {
+                                                                       throw 
new UnsupportedOperationException("The end timestamp of a " +
+                                                                               
        "processing-time window cannot become earlier than the current 
processing time " +
+                                                                               
        "by merging. Current processing time: " + 
internalTimerService.currentProcessingTime() +
+                                                                               
        " window: " + mergeResult);
+                                                               }
+
                                                                context.key = 
key;
                                                                context.window 
= mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9988343/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3144b6d..d2e819f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -303,6 +303,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        public void merge(W mergeResult,
                                                        Collection<W> 
mergedWindows, W stateWindowResult,
                                                        Collection<W> 
mergedStateWindows) throws Exception {
+
+                                               if 
((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness 
<= internalTimerService.currentWatermark())) {
+                                                       throw new 
UnsupportedOperationException("The end timestamp of an " +
+                                                                       
"event-time window cannot become earlier than the current watermark " +
+                                                                       "by 
merging. Current watermark: " + internalTimerService.currentWatermark() +
+                                                                       " 
window: " + mergeResult);
+                                               } else if 
(!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= 
internalTimerService.currentProcessingTime()) {
+                                                       throw new 
UnsupportedOperationException("The end timestamp of a " +
+                                                                       
"processing-time window cannot become earlier than the current processing time 
" +
+                                                                       "by 
merging. Current processing time: " + 
internalTimerService.currentProcessingTime() +
+                                                                       " 
window: " + mergeResult);
+                                               }
+
                                                context.key = key;
                                                context.window = mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9988343/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index a206455..aee3133 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1409,6 +1409,86 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
        }
 
        @Test
+       public void testRejectShrinkingMergingEventTimeWindows() throws 
Exception {
+               testRejectShrinkingMergingWindows(new EventTimeAdaptor());
+       }
+
+       @Test
+       public void testRejectShrinkingMergingProcessingTimeWindows() throws 
Exception {
+               testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor());
+       }
+
+       /**
+        * A misbehaving {@code WindowAssigner} can cause a window to become 
late by merging if
+        * it moves the end-of-window time before the watermark. This verifies 
that we don't allow that.
+        */
+       void testRejectShrinkingMergingWindows(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
+               int allowedLateness = 10;
+
+               if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+                       // we don't have allowed lateness for processing time
+                       allowedLateness = 0;
+               }
+
+               MergingWindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
+               timeAdaptor.setIsEventTime(mockAssigner);
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+               InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
+                               createWindowOperator(mockAssigner, mockTrigger, 
allowedLateness, mockWindowFunction);
+
+               testHarness.open();
+
+               timeAdaptor.advanceTime(testHarness, 0);
+
+               assertEquals(0, 
testHarness.extractOutputStreamRecords().size());
+               assertEquals(0, testHarness.numKeyedStateEntries());
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(0, 
22)));
+
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+
+               assertEquals(2, testHarness.numKeyedStateEntries()); // window 
contents and merging window set
+               assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup 
timer
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(0, 
25)));
+
+               timeAdaptor.advanceTime(testHarness, 20);
+
+               // our window should still be there
+               assertEquals(2, testHarness.numKeyedStateEntries()); // window 
contents and merging window set
+               assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup 
timer
+
+               // the result timestamp is ... + 2 because a watermark t says 
no element with
+               // timestamp <= t will come in the future and because window 
ends are exclusive:
+               // a window (0, 12) will have 11 as maxTimestamp. With the 
watermark at 20, 10 would
+               // already be considered late
+               shouldMergeWindows(
+                               mockAssigner,
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
22), new TimeWindow(0, 25))),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
22), new TimeWindow(0, 25))),
+                               new TimeWindow(0, 20 - allowedLateness + 2));
+
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+
+               // now merge it to a window that is just late
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(0, 
25)));
+
+               shouldMergeWindows(
+                               mockAssigner,
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
20 - allowedLateness + 2), new TimeWindow(0, 25))),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
20 - allowedLateness + 2), new TimeWindow(0, 25))),
+                               new TimeWindow(0, 20 - allowedLateness + 1));
+
+               expectedException.expect(UnsupportedOperationException.class);
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+       }
+
+       @Test
        public void testMergingOfExistingEventTimeWindows() throws Exception {
                testMergingOfExistingWindows(new EventTimeAdaptor());
        }

Reply via email to