[FLINK-5972] Don't allow shrinking merging windows This closes #3587.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68289b1a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68289b1a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68289b1a Branch: refs/heads/table-retraction Commit: 68289b1a52db7157d23085850ec947e78e729f01 Parents: 25d52e4 Author: Aljoscha Krettek <[email protected]> Authored: Tue Mar 21 14:58:45 2017 +0100 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Mar 23 23:29:02 2017 +0800 ---------------------------------------------------------------------- .../windowing/EvictingWindowOperator.java | 13 ++++ .../operators/windowing/WindowOperator.java | 15 +++- .../windowing/WindowOperatorContractTest.java | 80 ++++++++++++++++++++ 3 files changed, 107 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 951f661..24c8d32 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -121,6 +121,19 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { + + if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { + throw new UnsupportedOperationException("The end timestamp of an " + + "event-time window cannot become earlier than the current watermark " + + "by merging. Current watermark: " + internalTimerService.currentWatermark() + + " window: " + mergeResult); + } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { + throw new UnsupportedOperationException("The end timestamp of a " + + "processing-time window cannot become earlier than the current processing time " + + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + + " window: " + mergeResult); + } + context.key = key; context.window = mergeResult; http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index b4283d8..3745659 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -131,7 +131,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * {@code window.maxTimestamp + allowedLateness} landmark. * </ul> */ - private final long allowedLateness; + protected final long allowedLateness; /** * {@link OutputTag} to use for late arriving events. Elements for which @@ -352,6 +352,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { + + if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { + throw new UnsupportedOperationException("The end timestamp of an " + + "event-time window cannot become earlier than the current watermark " + + "by merging. Current watermark: " + internalTimerService.currentWatermark() + + " window: " + mergeResult); + } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { + throw new UnsupportedOperationException("The end timestamp of a " + + "processing-time window cannot become earlier than the current processing time " + + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + + " window: " + mergeResult); + } + context.key = key; context.window = mergeResult; http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index aaea8b1..8aae46a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -1479,6 +1479,86 @@ public abstract class WindowOperatorContractTest extends TestLogger { } @Test + public void testRejectShrinkingMergingEventTimeWindows() throws Exception { + testRejectShrinkingMergingWindows(new EventTimeAdaptor()); + } + + @Test + public void testRejectShrinkingMergingProcessingTimeWindows() throws Exception { + testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor()); + } + + /** + * A misbehaving {@code WindowAssigner} can cause a window to become late by merging if + * it moves the end-of-window time before the watermark. This verifies that we don't allow that. + */ + void testRejectShrinkingMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception { + int allowedLateness = 10; + + if (timeAdaptor instanceof ProcessingTimeAdaptor) { + // we don't have allowed lateness for processing time + allowedLateness = 0; + } + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, 0); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 22))); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set + assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 25))); + + timeAdaptor.advanceTime(testHarness, 20); + + // our window should still be there + assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set + assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer + + // the result timestamp is ... + 2 because a watermark t says no element with + // timestamp <= t will come in the future and because window ends are exclusive: + // a window (0, 12) will have 11 as maxTimestamp. With the watermark at 20, 10 would + // already be considered late + shouldMergeWindows( + mockAssigner, + new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))), + new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))), + new TimeWindow(0, 20 - allowedLateness + 2)); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // now merge it to a window that is just late + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 25))); + + shouldMergeWindows( + mockAssigner, + new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))), + new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))), + new TimeWindow(0, 20 - allowedLateness + 1)); + + expectedException.expect(UnsupportedOperationException.class); + testHarness.processElement(new StreamRecord<>(0, 0L)); + } + + @Test public void testMergingOfExistingEventTimeWindows() throws Exception { testMergingOfExistingWindows(new EventTimeAdaptor()); }
