[
https://issues.apache.org/jira/browse/FLINK-5972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936701#comment-15936701
]
ASF GitHub Bot commented on FLINK-5972:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3587#discussion_r107474350
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
---
@@ -0,0 +1,2654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import static
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+/**
+ * These tests verify that {@link WindowOperator} correctly interacts with
the other windowing
+ * components: {@link WindowAssigner},
+ * {@link Trigger}.
+ * {@link
org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window
state.
+ *
+ * <p>These tests document the implicit contract that exists between the
windowing components.
+ *
+ * <p><b>Important:</b>This test must always be kept up-to-date with
+ * {@link WindowOperatorContractTest}.
+ */
+public class EvictingWindowOperatorContractTest extends TestLogger {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private static ValueStateDescriptor<String> valueStateDescriptor =
+ new ValueStateDescriptor<>("string-state",
StringSerializer.INSTANCE, null);
+
+ private static ListStateDescriptor<StreamRecord<Integer>>
intListDescriptor =
+ new ListStateDescriptor<>("int-list",
(TypeSerializer<StreamRecord<Integer>>) new
StreamElementSerializer(IntSerializer.INSTANCE));
+
+ static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT,
KEY, W> mockWindowFunction() throws Exception {
+ @SuppressWarnings("unchecked")
+ InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction =
mock(InternalWindowFunction.class);
+
+ return mockWindowFunction;
+ }
+
+ static <T, W extends Window> Trigger<T, W> mockTrigger() throws
Exception {
+ @SuppressWarnings("unchecked")
+ Trigger<T, W> mockTrigger = mock(Trigger.class);
+
+ when(mockTrigger.onElement(Matchers.<T>any(), anyLong(),
Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ when(mockTrigger.onEventTime(anyLong(), Matchers.<W>any(),
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ when(mockTrigger.onProcessingTime(anyLong(), Matchers.<W>any(),
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+
+ return mockTrigger;
+ }
+
+ static <T> WindowAssigner<T, TimeWindow> mockTimeWindowAssigner()
throws Exception {
+ @SuppressWarnings("unchecked")
+ WindowAssigner<T, TimeWindow> mockAssigner =
mock(WindowAssigner.class);
+
+
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
TimeWindow.Serializer());
+ when(mockAssigner.isEventTime()).thenReturn(true);
+
+ return mockAssigner;
+ }
+
+ static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner()
throws Exception {
+ @SuppressWarnings("unchecked")
+ WindowAssigner<T, GlobalWindow> mockAssigner =
mock(WindowAssigner.class);
+
+
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
GlobalWindow.Serializer());
+ when(mockAssigner.isEventTime()).thenReturn(true);
+ when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(),
anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));
+
+ return mockAssigner;
+ }
+
+
+ static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner()
throws Exception {
+ @SuppressWarnings("unchecked")
+ MergingWindowAssigner<T, TimeWindow> mockAssigner =
mock(MergingWindowAssigner.class);
+
+
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
TimeWindow.Serializer());
+ when(mockAssigner.isEventTime()).thenReturn(true);
+
+ return mockAssigner;
+ }
+
+
+ static WindowAssigner.WindowAssignerContext anyAssignerContext() {
+ return Mockito.any();
+ }
+
+ static Trigger.TriggerContext anyTriggerContext() {
+ return Mockito.any();
+ }
+
+ static <T> Collector<T> anyCollector() {
+ return Mockito.any();
+ }
+
+ static Iterable<Integer> anyIntIterable() {
+ return Mockito.any();
+ }
+
+ @SuppressWarnings("unchecked")
+ static Iterable<Integer> intIterable(Integer... values) {
+ return (Iterable<Integer>) argThat(containsInAnyOrder(values));
+ }
+
+ static TimeWindow anyTimeWindow() {
+ return Mockito.any();
+ }
+
+ static Trigger.OnMergeContext anyOnMergeContext() {
+ return Mockito.any();
+ }
+
+ static MergingWindowAssigner.MergeCallback anyMergeCallback() {
+ return Mockito.any();
+ }
+
+
+ static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T,
TimeWindow> mockTrigger, final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock
invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext)
invocation.getArguments()[3];
+ context.registerEventTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+ .when(mockTrigger).onElement(Matchers.<T>anyObject(),
anyLong(), anyTimeWindow(), anyTriggerContext());
+ }
+
+ private static <T> void shouldDeleteEventTimeTimerOnElement(Trigger<T,
TimeWindow> mockTrigger, final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock
invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext)
invocation.getArguments()[3];
+ context.deleteEventTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+ .when(mockTrigger).onElement(Matchers.<T>anyObject(),
anyLong(), anyTimeWindow(), anyTriggerContext());
+ }
+
+ private static <T> void
shouldRegisterProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger,
final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock
invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext)
invocation.getArguments()[3];
+ context.registerProcessingTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+
.when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(),
anyTimeWindow(), anyTriggerContext());
+ }
+
+ private static <T> void
shouldDeleteProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger,
final long timestamp) throws Exception {
+ doAnswer(new Answer<TriggerResult>() {
+ @Override
+ public TriggerResult answer(InvocationOnMock
invocation) throws Exception {
+ @SuppressWarnings("unchecked")
+ Trigger.TriggerContext context =
+ (Trigger.TriggerContext)
invocation.getArguments()[3];
+ context.deleteProcessingTimeTimer(timestamp);
+ return TriggerResult.CONTINUE;
+ }
+ })
+
.when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(),
anyTimeWindow(), anyTriggerContext());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T, W extends Window> void shouldMergeWindows(final
MergingWindowAssigner<T, W> assigner, final Collection<? extends W>
expectedWindows, final Collection<W> toMerge, final W mergeResult) {
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation)
throws Exception {
+ Collection<W> windows = (Collection<W>)
invocation.getArguments()[0];
+
+ MergingWindowAssigner.MergeCallback callback =
(MergingWindowAssigner.MergeCallback) invocation.getArguments()[1];
+
+ // verify the expected windows
+ assertThat(windows,
containsInAnyOrder(expectedWindows.toArray()));
+
+ callback.merge(toMerge, mergeResult);
+ return null;
+ }
+ })
+ .when(assigner).mergeWindows(anyCollection(),
Matchers.<MergingWindowAssigner.MergeCallback>anyObject());
+ }
+
+ private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow>
mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ }
+
+ private static <T> void shouldFireOnElement(Trigger<T, TimeWindow>
mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ }
+
+ private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow>
mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ }
+
+ private static <T> void shouldFireAndPurgeOnElement(Trigger<T,
TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ }
+
+ private static <T> void shouldContinueOnEventTime(Trigger<T,
TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ }
+
+ private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow>
mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ }
+
+ private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow>
mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ }
+
+ private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T,
TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onEventTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ }
+
+ private static <T> void shouldContinueOnProcessingTime(Trigger<T,
TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+ }
+
+ private static <T> void shouldFireOnProcessingTime(Trigger<T,
TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+ }
+
+ private static <T> void shouldPurgeOnProcessingTime(Trigger<T,
TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+ }
+
+ private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T,
TimeWindow> mockTrigger) throws Exception {
+ when(mockTrigger.onProcessingTime(anyLong(),
Matchers.<TimeWindow>any(),
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+ }
+
+ /**
+ * Verify that there is no late-date side output if the {@code
WindowAssigner} does
+ * not assign any windows.
+ */
+ @Test
+ public void testNoLateSideOutputForSkippedWindows() throws Exception {
+
+ OutputTag<Integer> lateOutputTag = new
OutputTag<Integer>("late"){};
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner =
mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer,
TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger,
0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(),
anyAssignerContext()))
+
.thenReturn(Collections.<TimeWindow>emptyList());
+
+ testHarness.processWatermark(0);
+ testHarness.processElement(new StreamRecord<>(0, 5L));
+
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L),
anyAssignerContext());
+
+ assertTrue(testHarness.getSideOutput(lateOutputTag) == null ||
testHarness.getSideOutput(lateOutputTag).isEmpty());
+ }
+
+ @Test
+ public void testLateSideOutput() throws Exception {
+
+ OutputTag<Integer> lateOutputTag = new
OutputTag<Integer>("late"){};
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner =
mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer,
TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger,
0L, intListDescriptor, mockWindowFunction, lateOutputTag);
+
+ testHarness.open();
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(),
anyAssignerContext()))
+ .thenReturn(Collections.singletonList(new
TimeWindow(0, 0)));
+
+ testHarness.processWatermark(20);
+ testHarness.processElement(new StreamRecord<>(0, 5L));
+
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L),
anyAssignerContext());
+
+ assertThat(testHarness.getSideOutput(lateOutputTag),
+ containsInAnyOrder(isStreamRecord(0, 5L)));
+
+ // we should also see side output if the WindowAssigner assigns
no windows
+ when(mockAssigner.assignWindows(anyInt(), anyLong(),
anyAssignerContext()))
+
.thenReturn(Collections.<TimeWindow>emptyList());
+
+ testHarness.processElement(new StreamRecord<>(0, 10L));
+
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L),
anyAssignerContext());
+ verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L),
anyAssignerContext());
+
+ assertThat(testHarness.getSideOutput(lateOutputTag),
+ containsInAnyOrder(isStreamRecord(0, 5L),
isStreamRecord(0, 10L)));
--- End diff --
Fixing
> Don't allow shrinking merging windows
> -------------------------------------
>
> Key: FLINK-5972
> URL: https://issues.apache.org/jira/browse/FLINK-5972
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.1.0, 1.2.0, 1.3.0
> Reporter: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> A misbehaving {{MergingWindowAssigner}} can cause a merge that results in a
> window that is smaller than the span of all the merged windows. This, in
> itself is not problematic. It becomes problematic when the end timestamp of a
> window that was not late before merging is now earlier than the watermark
> (the timestamp is smaller than the watermark).
> There are two choices:
> - immediately process the window
> - drop the window
> processing the window will lead to late data downstream.
> The current behaviour is to silently drop the window but that logic has a
> bug: we only remove the dropped window from the {{MergingWindowSet}} but we
> don't properly clean up state and timers that the window still (possibly)
> has. We should fix this bug in the process of resolving this issue.
> We should either just fix the bug and still silently drop windows or add a
> check and throw an exception when the end timestamp falls below the watermark.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)