[ 
https://issues.apache.org/jira/browse/FLINK-5972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936701#comment-15936701
 ] 

ASF GitHub Bot commented on FLINK-5972:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3587#discussion_r107474350
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
 ---
    @@ -0,0 +1,2654 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators.windowing;
    +
    +
    +import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.*;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Mockito.*;
    +
    +import com.google.common.collect.Lists;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.state.AppendingState;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
    +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
    +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
    +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
    +import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
    +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
    +import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.OutputTag;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.ExpectedException;
    +import org.mockito.Matchers;
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.mockito.verification.VerificationMode;
    +
    +/**
    + * These tests verify that {@link WindowOperator} correctly interacts with 
the other windowing
    + * components: {@link WindowAssigner},
    + * {@link Trigger}.
    + * {@link 
org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window 
state.
    + *
    + * <p>These tests document the implicit contract that exists between the 
windowing components.
    + *
    + * <p><b>Important:</b>This test must always be kept up-to-date with
    + * {@link WindowOperatorContractTest}.
    + */
    +public class EvictingWindowOperatorContractTest extends TestLogger {
    +
    +   @Rule
    +   public ExpectedException expectedException = ExpectedException.none();
    +
    +   private static ValueStateDescriptor<String> valueStateDescriptor =
    +                   new ValueStateDescriptor<>("string-state", 
StringSerializer.INSTANCE, null);
    +
    +   private static ListStateDescriptor<StreamRecord<Integer>> 
intListDescriptor =
    +                   new ListStateDescriptor<>("int-list", 
(TypeSerializer<StreamRecord<Integer>>) new 
StreamElementSerializer(IntSerializer.INSTANCE));
    +
    +   static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, 
KEY, W> mockWindowFunction() throws Exception {
    +           @SuppressWarnings("unchecked")
    +           InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = 
mock(InternalWindowFunction.class);
    +
    +           return mockWindowFunction;
    +   }
    +
    +   static <T, W extends Window> Trigger<T, W> mockTrigger() throws 
Exception {
    +           @SuppressWarnings("unchecked")
    +           Trigger<T, W> mockTrigger = mock(Trigger.class);
    +
    +           when(mockTrigger.onElement(Matchers.<T>any(), anyLong(), 
Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +           when(mockTrigger.onEventTime(anyLong(), Matchers.<W>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +           when(mockTrigger.onProcessingTime(anyLong(), Matchers.<W>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +
    +           return mockTrigger;
    +   }
    +
    +   static <T> WindowAssigner<T, TimeWindow> mockTimeWindowAssigner() 
throws Exception {
    +           @SuppressWarnings("unchecked")
    +           WindowAssigner<T, TimeWindow> mockAssigner = 
mock(WindowAssigner.class);
    +
    +           
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
 TimeWindow.Serializer());
    +           when(mockAssigner.isEventTime()).thenReturn(true);
    +
    +           return mockAssigner;
    +   }
    +
    +   static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() 
throws Exception {
    +           @SuppressWarnings("unchecked")
    +           WindowAssigner<T, GlobalWindow> mockAssigner = 
mock(WindowAssigner.class);
    +
    +           
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
 GlobalWindow.Serializer());
    +           when(mockAssigner.isEventTime()).thenReturn(true);
    +           when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), 
anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));
    +
    +           return mockAssigner;
    +   }
    +
    +
    +   static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() 
throws Exception {
    +           @SuppressWarnings("unchecked")
    +           MergingWindowAssigner<T, TimeWindow> mockAssigner = 
mock(MergingWindowAssigner.class);
    +
    +           
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
 TimeWindow.Serializer());
    +           when(mockAssigner.isEventTime()).thenReturn(true);
    +
    +           return mockAssigner;
    +   }
    +
    +
    +   static WindowAssigner.WindowAssignerContext anyAssignerContext() {
    +           return Mockito.any();
    +   }
    +
    +   static Trigger.TriggerContext anyTriggerContext() {
    +           return Mockito.any();
    +   }
    +
    +   static <T> Collector<T> anyCollector() {
    +           return Mockito.any();
    +   }
    +
    +   static Iterable<Integer> anyIntIterable() {
    +           return Mockito.any();
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   static Iterable<Integer> intIterable(Integer... values) {
    +           return (Iterable<Integer>) argThat(containsInAnyOrder(values));
    +   }
    +
    +   static TimeWindow anyTimeWindow() {
    +           return Mockito.any();
    +   }
    +
    +   static Trigger.OnMergeContext anyOnMergeContext() {
    +           return Mockito.any();
    +   }
    +
    +   static MergingWindowAssigner.MergeCallback anyMergeCallback() {
    +           return Mockito.any();
    +   }
    +
    +
    +   static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, 
TimeWindow> mockTrigger, final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.registerEventTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +           .when(mockTrigger).onElement(Matchers.<T>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   private static <T> void shouldDeleteEventTimeTimerOnElement(Trigger<T, 
TimeWindow> mockTrigger, final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.deleteEventTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +           .when(mockTrigger).onElement(Matchers.<T>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   private static <T> void 
shouldRegisterProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, 
final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.registerProcessingTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +                           
.when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   private static <T> void 
shouldDeleteProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, 
final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.deleteProcessingTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +                           
.when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T, W extends Window> void shouldMergeWindows(final 
MergingWindowAssigner<T, W> assigner, final Collection<? extends W> 
expectedWindows, final Collection<W> toMerge, final W mergeResult) {
    +           doAnswer(new Answer<Object>() {
    +                   @Override
    +                   public Object answer(InvocationOnMock invocation) 
throws Exception {
    +                           Collection<W> windows = (Collection<W>) 
invocation.getArguments()[0];
    +
    +                           MergingWindowAssigner.MergeCallback callback = 
(MergingWindowAssigner.MergeCallback) invocation.getArguments()[1];
    +
    +                           // verify the expected windows
    +                           assertThat(windows, 
containsInAnyOrder(expectedWindows.toArray()));
    +
    +                           callback.merge(toMerge, mergeResult);
    +                           return null;
    +                   }
    +           })
    +                           .when(assigner).mergeWindows(anyCollection(), 
Matchers.<MergingWindowAssigner.MergeCallback>anyObject());
    +   }
    +
    +   private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +   }
    +
    +   private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
    +   }
    +
    +   private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
    +   }
    +
    +   private static <T> void shouldFireAndPurgeOnElement(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
    +   }
    +
    +   private static <T> void shouldContinueOnEventTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +   }
    +
    +   private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
    +   }
    +
    +   private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
    +   }
    +
    +   private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
    +   }
    +
    +   private static <T> void shouldContinueOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +   }
    +
    +   private static <T> void shouldFireOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
    +   }
    +
    +   private static <T> void shouldPurgeOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
    +   }
    +
    +   private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
    +   }
    +
    +   /**
    +    * Verify that there is no late-date side output if the {@code 
WindowAssigner} does
    +    * not assign any windows.
    +    */
    +   @Test
    +   public void testNoLateSideOutputForSkippedWindows() throws Exception {
    +
    +           OutputTag<Integer> lateOutputTag = new 
OutputTag<Integer>("late"){};
    +
    +           WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
    +           Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
    +           InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
    +
    +           OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
    +                           createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction, lateOutputTag);
    +
    +           testHarness.open();
    +
    +           when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
    +                           
.thenReturn(Collections.<TimeWindow>emptyList());
    +
    +           testHarness.processWatermark(0);
    +           testHarness.processElement(new StreamRecord<>(0, 5L));
    +
    +           verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), 
anyAssignerContext());
    +
    +           assertTrue(testHarness.getSideOutput(lateOutputTag) == null || 
testHarness.getSideOutput(lateOutputTag).isEmpty());
    +   }
    +
    +   @Test
    +   public void testLateSideOutput() throws Exception {
    +
    +           OutputTag<Integer> lateOutputTag = new 
OutputTag<Integer>("late"){};
    +
    +           WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
    +           Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
    +           InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
    +
    +           OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
    +                           createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction, lateOutputTag);
    +
    +           testHarness.open();
    +
    +           when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
    +                           .thenReturn(Collections.singletonList(new 
TimeWindow(0, 0)));
    +
    +           testHarness.processWatermark(20);
    +           testHarness.processElement(new StreamRecord<>(0, 5L));
    +
    +           verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), 
anyAssignerContext());
    +
    +           assertThat(testHarness.getSideOutput(lateOutputTag),
    +                           containsInAnyOrder(isStreamRecord(0, 5L)));
    +
    +           // we should also see side output if the WindowAssigner assigns 
no windows
    +           when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
    +                           
.thenReturn(Collections.<TimeWindow>emptyList());
    +
    +           testHarness.processElement(new StreamRecord<>(0, 10L));
    +
    +           verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), 
anyAssignerContext());
    +           verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L), 
anyAssignerContext());
    +
    +           assertThat(testHarness.getSideOutput(lateOutputTag),
    +                           containsInAnyOrder(isStreamRecord(0, 5L), 
isStreamRecord(0, 10L)));
    --- End diff --
    
    Fixing


> Don't allow shrinking merging windows
> -------------------------------------
>
>                 Key: FLINK-5972
>                 URL: https://issues.apache.org/jira/browse/FLINK-5972
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.1.0, 1.2.0, 1.3.0
>            Reporter: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.3.0, 1.2.1
>
>
> A misbehaving {{MergingWindowAssigner}} can cause a merge that results in a 
> window that is smaller than the span of all the merged windows. This, in 
> itself is not problematic. It becomes problematic when the end timestamp of a 
> window that was not late before merging is now earlier than the watermark 
> (the timestamp is smaller than the watermark).
> There are two choices:
>  - immediately process the window
>  - drop the window
> processing the window will lead to late data downstream.
> The current behaviour is to silently drop the window but that logic has a 
> bug: we only remove the dropped window from the {{MergingWindowSet}} but we 
> don't properly clean up state and timers that the window still (possibly) 
> has. We should fix this bug in the process of resolving this issue.
> We should either just fix the bug and still silently drop windows or add a 
> check and throw an exception when the end timestamp falls below the watermark.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to