[FLINK-5972] Don't allow shrinking merging windows

This closes #3587.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68289b1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68289b1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68289b1a

Branch: refs/heads/table-retraction
Commit: 68289b1a52db7157d23085850ec947e78e729f01
Parents: 25d52e4
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Mar 21 14:58:45 2017 +0100
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Mar 23 23:29:02 2017 +0800

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 13 ++++
 .../operators/windowing/WindowOperator.java     | 15 +++-
 .../windowing/WindowOperatorContractTest.java   | 80 ++++++++++++++++++++
 3 files changed, 107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 951f661..24c8d32 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -121,6 +121,19 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                                        public void merge(W 
mergeResult,
                                                                        
Collection<W> mergedWindows, W stateWindowResult,
                                                                        
Collection<W> mergedStateWindows) throws Exception {
+
+                                                               if 
((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness 
<= internalTimerService.currentWatermark())) {
+                                                                       throw 
new UnsupportedOperationException("The end timestamp of an " +
+                                                                               
        "event-time window cannot become earlier than the current watermark " +
+                                                                               
        "by merging. Current watermark: " + 
internalTimerService.currentWatermark() +
+                                                                               
        " window: " + mergeResult);
+                                                               } else if 
(!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= 
internalTimerService.currentProcessingTime()) {
+                                                                       throw 
new UnsupportedOperationException("The end timestamp of a " +
+                                                                               
        "processing-time window cannot become earlier than the current 
processing time " +
+                                                                               
        "by merging. Current processing time: " + 
internalTimerService.currentProcessingTime() +
+                                                                               
        " window: " + mergeResult);
+                                                               }
+
                                                                context.key = 
key;
                                                                context.window 
= mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b4283d8..3745659 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -131,7 +131,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         *         {@code window.maxTimestamp + allowedLateness} landmark.
         * </ul>
         */
-       private final long allowedLateness;
+       protected final long allowedLateness;
 
        /**
         * {@link OutputTag} to use for late arriving events. Elements for which
@@ -352,6 +352,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        public void merge(W mergeResult,
                                                        Collection<W> 
mergedWindows, W stateWindowResult,
                                                        Collection<W> 
mergedStateWindows) throws Exception {
+
+                                               if 
((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness 
<= internalTimerService.currentWatermark())) {
+                                                       throw new 
UnsupportedOperationException("The end timestamp of an " +
+                                                                       
"event-time window cannot become earlier than the current watermark " +
+                                                                       "by 
merging. Current watermark: " + internalTimerService.currentWatermark() +
+                                                                       " 
window: " + mergeResult);
+                                               } else if 
(!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= 
internalTimerService.currentProcessingTime()) {
+                                                       throw new 
UnsupportedOperationException("The end timestamp of a " +
+                                                                       
"processing-time window cannot become earlier than the current processing time 
" +
+                                                                       "by 
merging. Current processing time: " + 
internalTimerService.currentProcessingTime() +
+                                                                       " 
window: " + mergeResult);
+                                               }
+
                                                context.key = key;
                                                context.window = mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index aaea8b1..8aae46a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1479,6 +1479,86 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
        }
 
        @Test
+       public void testRejectShrinkingMergingEventTimeWindows() throws 
Exception {
+               testRejectShrinkingMergingWindows(new EventTimeAdaptor());
+       }
+
+       @Test
+       public void testRejectShrinkingMergingProcessingTimeWindows() throws 
Exception {
+               testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor());
+       }
+
+       /**
+        * A misbehaving {@code WindowAssigner} can cause a window to become 
late by merging if
+        * it moves the end-of-window time before the watermark. This verifies 
that we don't allow that.
+        */
+       void testRejectShrinkingMergingWindows(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
+               int allowedLateness = 10;
+
+               if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+                       // we don't have allowed lateness for processing time
+                       allowedLateness = 0;
+               }
+
+               MergingWindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
+               timeAdaptor.setIsEventTime(mockAssigner);
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+               InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
+                               createWindowOperator(mockAssigner, mockTrigger, 
allowedLateness, mockWindowFunction);
+
+               testHarness.open();
+
+               timeAdaptor.advanceTime(testHarness, 0);
+
+               assertEquals(0, 
testHarness.extractOutputStreamRecords().size());
+               assertEquals(0, testHarness.numKeyedStateEntries());
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(0, 
22)));
+
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+
+               assertEquals(2, testHarness.numKeyedStateEntries()); // window 
contents and merging window set
+               assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup 
timer
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(0, 
25)));
+
+               timeAdaptor.advanceTime(testHarness, 20);
+
+               // our window should still be there
+               assertEquals(2, testHarness.numKeyedStateEntries()); // window 
contents and merging window set
+               assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup 
timer
+
+               // the result timestamp is ... + 2 because a watermark t says 
no element with
+               // timestamp <= t will come in the future and because window 
ends are exclusive:
+               // a window (0, 12) will have 11 as maxTimestamp. With the 
watermark at 20, 10 would
+               // already be considered late
+               shouldMergeWindows(
+                               mockAssigner,
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
22), new TimeWindow(0, 25))),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
22), new TimeWindow(0, 25))),
+                               new TimeWindow(0, 20 - allowedLateness + 2));
+
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+
+               // now merge it to a window that is just late
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(0, 
25)));
+
+               shouldMergeWindows(
+                               mockAssigner,
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
20 - allowedLateness + 2), new TimeWindow(0, 25))),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
20 - allowedLateness + 2), new TimeWindow(0, 25))),
+                               new TimeWindow(0, 20 - allowedLateness + 1));
+
+               expectedException.expect(UnsupportedOperationException.class);
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+       }
+
+       @Test
        public void testMergingOfExistingEventTimeWindows() throws Exception {
                testMergingOfExistingWindows(new EventTimeAdaptor());
        }

Reply via email to