Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3587#discussion_r107473789
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
 ---
    @@ -0,0 +1,2654 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators.windowing;
    +
    +
    +import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.*;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Mockito.*;
    +
    +import com.google.common.collect.Lists;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.state.AppendingState;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
    +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
    +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
    +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
    +import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
    +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
    +import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.OutputTag;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.ExpectedException;
    +import org.mockito.Matchers;
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.mockito.verification.VerificationMode;
    +
    +/**
    + * These tests verify that {@link WindowOperator} correctly interacts with 
the other windowing
    + * components: {@link WindowAssigner},
    + * {@link Trigger}.
    + * {@link 
org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window 
state.
    + *
    + * <p>These tests document the implicit contract that exists between the 
windowing components.
    + *
    + * <p><b>Important:</b>This test must always be kept up-to-date with
    + * {@link WindowOperatorContractTest}.
    + */
    +public class EvictingWindowOperatorContractTest extends TestLogger {
    +
    +   @Rule
    +   public ExpectedException expectedException = ExpectedException.none();
    +
    +   private static ValueStateDescriptor<String> valueStateDescriptor =
    +                   new ValueStateDescriptor<>("string-state", 
StringSerializer.INSTANCE, null);
    +
    +   private static ListStateDescriptor<StreamRecord<Integer>> 
intListDescriptor =
    +                   new ListStateDescriptor<>("int-list", 
(TypeSerializer<StreamRecord<Integer>>) new 
StreamElementSerializer(IntSerializer.INSTANCE));
    +
    +   static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, 
KEY, W> mockWindowFunction() throws Exception {
    +           @SuppressWarnings("unchecked")
    +           InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = 
mock(InternalWindowFunction.class);
    +
    +           return mockWindowFunction;
    +   }
    +
    +   static <T, W extends Window> Trigger<T, W> mockTrigger() throws 
Exception {
    +           @SuppressWarnings("unchecked")
    +           Trigger<T, W> mockTrigger = mock(Trigger.class);
    +
    +           when(mockTrigger.onElement(Matchers.<T>any(), anyLong(), 
Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +           when(mockTrigger.onEventTime(anyLong(), Matchers.<W>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +           when(mockTrigger.onProcessingTime(anyLong(), Matchers.<W>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +
    +           return mockTrigger;
    +   }
    +
    +   static <T> WindowAssigner<T, TimeWindow> mockTimeWindowAssigner() 
throws Exception {
    +           @SuppressWarnings("unchecked")
    +           WindowAssigner<T, TimeWindow> mockAssigner = 
mock(WindowAssigner.class);
    +
    +           
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
 TimeWindow.Serializer());
    +           when(mockAssigner.isEventTime()).thenReturn(true);
    +
    +           return mockAssigner;
    +   }
    +
    +   static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() 
throws Exception {
    +           @SuppressWarnings("unchecked")
    +           WindowAssigner<T, GlobalWindow> mockAssigner = 
mock(WindowAssigner.class);
    +
    +           
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
 GlobalWindow.Serializer());
    +           when(mockAssigner.isEventTime()).thenReturn(true);
    +           when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), 
anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));
    +
    +           return mockAssigner;
    +   }
    +
    +
    +   static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() 
throws Exception {
    +           @SuppressWarnings("unchecked")
    +           MergingWindowAssigner<T, TimeWindow> mockAssigner = 
mock(MergingWindowAssigner.class);
    +
    +           
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new
 TimeWindow.Serializer());
    +           when(mockAssigner.isEventTime()).thenReturn(true);
    +
    +           return mockAssigner;
    +   }
    +
    +
    +   static WindowAssigner.WindowAssignerContext anyAssignerContext() {
    +           return Mockito.any();
    +   }
    +
    +   static Trigger.TriggerContext anyTriggerContext() {
    +           return Mockito.any();
    +   }
    +
    +   static <T> Collector<T> anyCollector() {
    +           return Mockito.any();
    +   }
    +
    +   static Iterable<Integer> anyIntIterable() {
    +           return Mockito.any();
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   static Iterable<Integer> intIterable(Integer... values) {
    +           return (Iterable<Integer>) argThat(containsInAnyOrder(values));
    +   }
    +
    +   static TimeWindow anyTimeWindow() {
    +           return Mockito.any();
    +   }
    +
    +   static Trigger.OnMergeContext anyOnMergeContext() {
    +           return Mockito.any();
    +   }
    +
    +   static MergingWindowAssigner.MergeCallback anyMergeCallback() {
    +           return Mockito.any();
    +   }
    +
    +
    +   static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, 
TimeWindow> mockTrigger, final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.registerEventTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +           .when(mockTrigger).onElement(Matchers.<T>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   private static <T> void shouldDeleteEventTimeTimerOnElement(Trigger<T, 
TimeWindow> mockTrigger, final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.deleteEventTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +           .when(mockTrigger).onElement(Matchers.<T>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   private static <T> void 
shouldRegisterProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, 
final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.registerProcessingTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +                           
.when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   private static <T> void 
shouldDeleteProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, 
final long timestamp) throws Exception {
    +           doAnswer(new Answer<TriggerResult>() {
    +                   @Override
    +                   public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
    +                           @SuppressWarnings("unchecked")
    +                           Trigger.TriggerContext context =
    +                                           (Trigger.TriggerContext) 
invocation.getArguments()[3];
    +                           context.deleteProcessingTimeTimer(timestamp);
    +                           return TriggerResult.CONTINUE;
    +                   }
    +           })
    +                           
.when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext());
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T, W extends Window> void shouldMergeWindows(final 
MergingWindowAssigner<T, W> assigner, final Collection<? extends W> 
expectedWindows, final Collection<W> toMerge, final W mergeResult) {
    +           doAnswer(new Answer<Object>() {
    +                   @Override
    +                   public Object answer(InvocationOnMock invocation) 
throws Exception {
    +                           Collection<W> windows = (Collection<W>) 
invocation.getArguments()[0];
    +
    +                           MergingWindowAssigner.MergeCallback callback = 
(MergingWindowAssigner.MergeCallback) invocation.getArguments()[1];
    +
    +                           // verify the expected windows
    +                           assertThat(windows, 
containsInAnyOrder(expectedWindows.toArray()));
    +
    +                           callback.merge(toMerge, mergeResult);
    +                           return null;
    +                   }
    +           })
    +                           .when(assigner).mergeWindows(anyCollection(), 
Matchers.<MergingWindowAssigner.MergeCallback>anyObject());
    +   }
    +
    +   private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +   }
    +
    +   private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
    +   }
    +
    +   private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
    +   }
    +
    +   private static <T> void shouldFireAndPurgeOnElement(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
    +   }
    +
    +   private static <T> void shouldContinueOnEventTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +   }
    +
    +   private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
    +   }
    +
    +   private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
    +   }
    +
    +   private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
    +   }
    +
    +   private static <T> void shouldContinueOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
    +   }
    +
    +   private static <T> void shouldFireOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
    +   }
    +
    +   private static <T> void shouldPurgeOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
    +   }
    +
    +   private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
    +           when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
    +   }
    +
    +   /**
    +    * Verify that there is no late-date side output if the {@code 
WindowAssigner} does
    --- End diff --
    
    Fixing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to