http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java new file mode 100644 index 0000000..7c4d711 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -0,0 +1,2572 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.AppendingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.verification.VerificationMode; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.*; + +/** + * These tests verify that {@link WindowOperator} correctly interacts with the other windowing + * components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}, + * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. + * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state. + * + * <p>These tests document the implicit contract that exists between the windowing components. + */ +public class WindowOperatorContractTest extends TestLogger { + + private static ValueStateDescriptor<String> valueStateDescriptor = + new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE, null); + + private static ListStateDescriptor<Integer> intListDescriptor = + new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE); + + private static ReducingStateDescriptor<Integer> intReduceSumDescriptor = + new ReducingStateDescriptor<>("int-reduce", new Sum(), IntSerializer.INSTANCE); + + private static FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor = + new FoldingStateDescriptor<>("int-fold", 0, new FoldSum(), IntSerializer.INSTANCE); + + static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction() throws Exception { + @SuppressWarnings("unchecked") + InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = mock(InternalWindowFunction.class); + + return mockWindowFunction; + } + + static <T, W extends Window> Trigger<T, W> mockTrigger() throws Exception { + @SuppressWarnings("unchecked") + Trigger<T, W> mockTrigger = mock(Trigger.class); + + when(mockTrigger.onElement(Matchers.<T>any(), anyLong(), Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + when(mockTrigger.onEventTime(anyLong(), Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + when(mockTrigger.onProcessingTime(anyLong(), Matchers.<W>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + + return mockTrigger; + } + + static <T> WindowAssigner<T, TimeWindow> mockTimeWindowAssigner() throws Exception { + @SuppressWarnings("unchecked") + WindowAssigner<T, TimeWindow> mockAssigner = mock(WindowAssigner.class); + + when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new TimeWindow.Serializer()); + when(mockAssigner.isEventTime()).thenReturn(true); + + return mockAssigner; + } + + static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception { + @SuppressWarnings("unchecked") + WindowAssigner<T, GlobalWindow> mockAssigner = mock(WindowAssigner.class); + + when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new GlobalWindow.Serializer()); + when(mockAssigner.isEventTime()).thenReturn(true); + when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get())); + + return mockAssigner; + } + + + static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() throws Exception { + @SuppressWarnings("unchecked") + MergingWindowAssigner<T, TimeWindow> mockAssigner = mock(MergingWindowAssigner.class); + + when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new TimeWindow.Serializer()); + when(mockAssigner.isEventTime()).thenReturn(true); + + return mockAssigner; + } + + + static WindowAssigner.WindowAssignerContext anyAssignerContext() { + return Mockito.any(); + } + + static Trigger.TriggerContext anyTriggerContext() { + return Mockito.any(); + } + + static <T> Collector<T> anyCollector() { + return Mockito.any(); + } + + static Iterable<Integer> anyIntIterable() { + return Mockito.any(); + } + + @SuppressWarnings("unchecked") + static Iterable<Integer> intIterable(Integer... values) { + return (Iterable<Integer>) argThat(containsInAnyOrder(values)); + } + + static TimeWindow anyTimeWindow() { + return Mockito.any(); + } + + static Trigger.OnMergeContext anyOnMergeContext() { + return Mockito.any(); + } + + static MergingWindowAssigner.MergeCallback anyMergeCallback() { + return Mockito.any(); + } + + + static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception { + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + @SuppressWarnings("unchecked") + Trigger.TriggerContext context = + (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(timestamp); + return TriggerResult.CONTINUE; + } + }) + .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + } + + private static <T> void shouldDeleteEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception { + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + @SuppressWarnings("unchecked") + Trigger.TriggerContext context = + (Trigger.TriggerContext) invocation.getArguments()[3]; + context.deleteEventTimeTimer(timestamp); + return TriggerResult.CONTINUE; + } + }) + .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + } + + private static <T> void shouldRegisterProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception { + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + @SuppressWarnings("unchecked") + Trigger.TriggerContext context = + (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerProcessingTimeTimer(timestamp); + return TriggerResult.CONTINUE; + } + }) + .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + } + + private static <T> void shouldDeleteProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception { + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + @SuppressWarnings("unchecked") + Trigger.TriggerContext context = + (Trigger.TriggerContext) invocation.getArguments()[3]; + context.deleteProcessingTimeTimer(timestamp); + return TriggerResult.CONTINUE; + } + }) + .when(mockTrigger).onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + } + + @SuppressWarnings("unchecked") + private static <T, W extends Window> void shouldMergeWindows(final MergingWindowAssigner<T, W> assigner, final Collection<? extends W> expectedWindows, final Collection<W> toMerge, final W mergeResult) { + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Exception { + Collection<W> windows = (Collection<W>) invocation.getArguments()[0]; + + MergingWindowAssigner.MergeCallback callback = (MergingWindowAssigner.MergeCallback) invocation.getArguments()[1]; + + // verify the expected windows + assertThat(windows, containsInAnyOrder(expectedWindows.toArray())); + + callback.merge(toMerge, mergeResult); + return null; + } + }) + .when(assigner).mergeWindows(anyCollection(), Matchers.<MergingWindowAssigner.MergeCallback>anyObject()); + } + + private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + } + + private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + } + + private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + } + + private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + } + + private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + } + + private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + } + + private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + } + + private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + } + + private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + } + + private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + } + + private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + } + + private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { + when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + } + + @Test + public void testAssignerIsInvokedOncePerElement() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + OneInputStreamOperatorTestHarness<Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, 0))); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockAssigner, times(1)).assignWindows(eq(0), eq(0L), anyAssignerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockAssigner, times(2)).assignWindows(eq(0), eq(0L), anyAssignerContext()); + + } + + @Test + public void testAssignerWithMultipleWindows() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + OneInputStreamOperatorTestHarness<Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + shouldFireOnElement(mockTrigger); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + } + + @Test + public void testWindowsDontInterfere() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, 2))); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, 1))); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + // no output so far + assertTrue(testHarness.extractOutputStreamRecords().isEmpty()); + + // state for two windows + assertEquals(2, testHarness.numKeyedStateEntries()); + assertEquals(2, testHarness.numEventTimeTimers()); + + // now we fire + shouldFireOnElement(mockTrigger); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, 1))); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, 2))); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockWindowFunction, times(2)).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 1)), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector()); + } + + @Test + public void testOnElementCalledPerWindow() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + OneInputStreamOperatorTestHarness<Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + testHarness.processElement(new StreamRecord<>(42, 1L)); + + verify(mockTrigger).onElement(eq(42), eq(1L), eq(new TimeWindow(2, 4)), anyTriggerContext()); + verify(mockTrigger).onElement(eq(42), eq(1L), eq(new TimeWindow(0, 2)), anyTriggerContext()); + + verify(mockTrigger, times(2)).onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()); + } + + @Test + public void testReducingWindow() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + // insert two elements without firing + testHarness.processElement(new StreamRecord<>(1, 0L)); + testHarness.processElement(new StreamRecord<>(1, 0L)); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + } + + @Test + public void testFoldingWindow() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + // insert two elements without firing + testHarness.processElement(new StreamRecord<>(1, 0L)); + testHarness.processElement(new StreamRecord<>(1, 0L)); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + } + + @Test + public void testEmittingFromWindowFunction() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, 2))); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Exception { + @SuppressWarnings("unchecked") + Collector<String> out = invocation.getArgumentAt(3, Collector.class); + out.collect("Hallo"); + out.collect("Ciao"); + return null; + } + }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + + assertThat(testHarness.extractOutputStreamRecords(), + containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); + } + + @Test + public void testEmittingFromWindowFunctionOnEventTime() throws Exception { + testEmittingFromWindowFunction(new EventTimeAdaptor()); + } + + @Test + public void testEmittingFromWindowFunctionOnProcessingTime() throws Exception { + testEmittingFromWindowFunction(new ProcessingTimeAdaptor()); + } + + + private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Collections.singletonList(new TimeWindow(0, 2))); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Exception { + @SuppressWarnings("unchecked") + Collector<String> out = invocation.getArgumentAt(3, Collector.class); + out.collect("Hallo"); + out.collect("Ciao"); + return null; + } + }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockWindowFunction, never()).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector()); + assertTrue(testHarness.extractOutputStreamRecords().isEmpty()); + + timeAdaptor.shouldFireOnTime(mockTrigger); + + timeAdaptor.advanceTime(testHarness, 1L); + + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + + assertThat(testHarness.extractOutputStreamRecords(), + containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); + } + + @Test + public void testOnElementContinue() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // CONTINUE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + + // there should be no firing + assertEquals(0, testHarness.getOutput().size()); + } + + @Test + public void testOnElementFire() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + } + + @Test + public void testOnElementFireAndPurge() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE_AND_PURGE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE_AND_PURGE should purge contents + assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state will stick around until GC time + + // timers will stick around + assertEquals(4, testHarness.numEventTimeTimers()); + } + + @Test + public void testOnElementPurge() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.PURGE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // PURGE should purge contents + assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state will stick around until GC time + + // timers will stick around + assertEquals(4, testHarness.numEventTimeTimers()); // trigger timer and GC timer + + // no output + assertEquals(0, testHarness.getOutput().size()); + } + + @Test + public void testOnEventTimeContinue() throws Exception { + testOnTimeContinue(new EventTimeAdaptor()); + } + + @Test + public void testOnProcessingTimeContinue() throws Exception { + testOnTimeContinue(new ProcessingTimeAdaptor()); + } + + private void testOnTimeContinue(final TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + // this should register two timers because we have two windows + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // we don't want to fire the cleanup timer + timeAdaptor.registerTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + timeAdaptor.shouldContinueOnTime(mockTrigger); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents plus trigger state for two windows + assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows + + timeAdaptor.advanceTime(testHarness, 0L); + + assertEquals(4, testHarness.numKeyedStateEntries()); + assertEquals(2, timeAdaptor.numTimers(testHarness)); // only gc timers left + + // there should be no firing + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + } + + @Test + public void testOnEventTimeFire() throws Exception { + testOnTimeFire(new EventTimeAdaptor()); + } + + @Test + public void testOnProcessingTimeFire() throws Exception { + testOnTimeFire(new ProcessingTimeAdaptor()); + } + + private void testOnTimeFire(final TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // don't interfere with cleanup timers + timeAdaptor.registerTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + timeAdaptor.shouldFireOnTime(mockTrigger); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows + assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows + + timeAdaptor.advanceTime(testHarness, 0L); + + verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); + assertEquals(2, timeAdaptor.numTimers(testHarness)); // only gc timers left + } + + @Test + public void testOnEventTimeFireAndPurge() throws Exception { + testOnTimeFireAndPurge(new EventTimeAdaptor()); + } + + @Test + public void testOnProcessingTimeFireAndPurge() throws Exception { + testOnTimeFireAndPurge(new ProcessingTimeAdaptor()); + } + + private void testOnTimeFireAndPurge(final TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + timeAdaptor.registerTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + timeAdaptor.shouldFireAndPurgeOnTime(mockTrigger); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows + assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows + + timeAdaptor.advanceTime(testHarness, 0L); + + verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE_AND_PURGE should purge contents + assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state stays until GC time + assertEquals(2, timeAdaptor.numTimers(testHarness)); // gc timers are still there + } + + @Test + public void testOnEventTimePurge() throws Exception { + testOnTimePurge(new EventTimeAdaptor()); + } + + @Test + public void testOnProcessingTimePurge() throws Exception { + testOnTimePurge(new ProcessingTimeAdaptor()); + } + + private void testOnTimePurge(final TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(4, 6))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // don't interfere with cleanup timers + timeAdaptor.registerTimer(context, 1L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + timeAdaptor.shouldPurgeOnTime(mockTrigger); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows + assertEquals(4, timeAdaptor.numTimers(testHarness)); // timers/gc timers for two windows + + timeAdaptor.advanceTime(testHarness, 1L); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // PURGE should purge contents + assertEquals(2, testHarness.numKeyedStateEntries()); // trigger state will stick around + assertEquals(2, timeAdaptor.numTimers(testHarness)); // gc timers are still there + + // still no output + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + } + + @Test + public void testNoEventTimeFiringForPurgedWindow() throws Exception { + testNoTimerFiringForPurgedWindow(new EventTimeAdaptor()); + } + + @Test + public void testNoProcessingTimeFiringForPurgedWindow() throws Exception { + testNoTimerFiringForPurgedWindow(new ProcessingTimeAdaptor()); + } + + /** + * Verify that we neither invoke the trigger nor the window function if a timer + * for a non-existent window fires. + */ + private void testNoTimerFiringForPurgedWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + + @SuppressWarnings("unchecked") + InternalWindowFunction<Iterable<Integer>, List<Integer>, Integer, TimeWindow> mockWindowFunction = + mock(InternalWindowFunction.class); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // don't interfere with cleanup timers + timeAdaptor.registerTimer(context, 0L); + return TriggerResult.PURGE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(0, testHarness.numKeyedStateEntries()); // not contents or state + assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and gc timer + + timeAdaptor.advanceTime(testHarness, 0L); + + // trigger is not called if there is no more window (timer is silently ignored) + timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null); + + verify(mockWindowFunction, never()) + .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); + + assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left + } + + @Test + public void testNoEventTimeFiringForPurgedMergingWindow() throws Exception { + testNoTimerFiringForPurgedMergingWindow(new EventTimeAdaptor()); + } + + @Test + public void testNoProcessingTimeFiringForPurgedMergingWindow() throws Exception { + testNoTimerFiringForPurgedMergingWindow(new ProcessingTimeAdaptor()); + } + + + /** + * Verify that we neither invoke the trigger nor the window function if a timer + * for an empty merging window. + */ + public void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + + @SuppressWarnings("unchecked") + InternalWindowFunction<Iterable<Integer>, List<Integer>, Integer, TimeWindow> mockWindowFunction = + mock(InternalWindowFunction.class); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // don't interfere with cleanup timers + timeAdaptor.registerTimer(context, 0L); + return TriggerResult.PURGE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(1, testHarness.numKeyedStateEntries()); // just the merging window set + assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and gc timer + + timeAdaptor.advanceTime(testHarness, 0L); + + // trigger is not called if there is no more window (timer is silently ignored) + timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null); + + verify(mockWindowFunction, never()) + .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); + + assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left + } + + @Test + public void testNoEventTimeFiringForGarbageCollectedMergingWindow() throws Exception { + testNoTimerFiringForGarbageCollectedMergingWindow(new EventTimeAdaptor()); + } + + @Test + public void testNoProcessingTimeFiringForGarbageCollectedMergingWindow() throws Exception { + testNoTimerFiringForGarbageCollectedMergingWindow(new ProcessingTimeAdaptor()); + } + + + /** + * Verify that we neither invoke the trigger nor the window function if a timer + * fires for a merging window that was already garbage collected. + */ + public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + + @SuppressWarnings("unchecked") + InternalWindowFunction<Iterable<Integer>, List<Integer>, Integer, TimeWindow> mockWindowFunction = + mock(InternalWindowFunction.class); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // set a timer for after the GC time + timeAdaptor.registerTimer(context, 10L); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set + assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and gc timer + + timeAdaptor.shouldContinueOnTime(mockTrigger); + + // this should trigger GC + timeAdaptor.advanceTime(testHarness, 4L); + + verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext()); + + assertEquals(0, testHarness.numKeyedStateEntries()); + // we still have a dangling timer because our trigger doesn't do cleanup + assertEquals(1, timeAdaptor.numTimers(testHarness)); + + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + + verify(mockWindowFunction, never()) + .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); + + // now we trigger the dangling timer + timeAdaptor.advanceTime(testHarness, 10L); + + // we don't fire again + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + } + + @Test + public void testEventTimeTimerCreationAndDeletion() throws Exception { + testTimerCreationAndDeletion(new EventTimeAdaptor()); + } + + @Test + public void testProcessingTimeTimerCreationAndDeletion() throws Exception { + testTimerCreationAndDeletion(new ProcessingTimeAdaptor()); + } + + private void testTimerCreationAndDeletion(TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 2))); + + assertEquals(0, timeAdaptor.numTimers(testHarness)); + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 17); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(2, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 42); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(3, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window + + timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 42); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 17); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window + } + + @Test + public void testEventTimeTimerFiring() throws Exception { + testTimerFiring(new EventTimeAdaptor()); + } + + @Test + public void testProcessingTimeTimerFiring() throws Exception { + testTimerFiring(new ProcessingTimeAdaptor()); + } + + + private void testTimerFiring(TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 100))); + + assertEquals(0, timeAdaptor.numTimers(testHarness)); + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 17); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 42); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(4, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window + + timeAdaptor.advanceTime(testHarness, 1); + + timeAdaptor.verifyTriggerCallback(mockTrigger, atLeastOnce(), 1L, new TimeWindow(0, 100)); + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + assertEquals(3, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window + + // doesn't do anything + timeAdaptor.advanceTime(testHarness, 15); + + // so still the same + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + + timeAdaptor.advanceTime(testHarness, 42); + + timeAdaptor.verifyTriggerCallback(mockTrigger, atLeastOnce(), 17L, new TimeWindow(0, 100)); + timeAdaptor.verifyTriggerCallback(mockTrigger, atLeastOnce(), 42L, new TimeWindow(0, 100)); + timeAdaptor.verifyTriggerCallback(mockTrigger, times(3), null, null); + assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 because of the GC timer of the window + } + + @Test + public void testEventTimeDeletedTimerDoesNotFire() throws Exception { + testDeletedTimerDoesNotFire(new EventTimeAdaptor()); + } + + @Test + public void testProcessingTimeDeletedTimerDoesNotFire() throws Exception { + testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor()); + } + + public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 100))); + + assertEquals(0, timeAdaptor.numTimers(testHarness)); + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(2, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer + + timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 1); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 2); + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(2, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer + + timeAdaptor.advanceTime(testHarness, 50L); + + timeAdaptor.verifyTriggerCallback(mockTrigger, times(0), 1L, null); + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), 2L, new TimeWindow(0, 100)); + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + + assertEquals(1, timeAdaptor.numTimers(testHarness)); // +1 for the GC timer + } + + @Test + public void testMergeWindowsIsCalled() throws Exception { + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4))), anyMergeCallback()); + verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback()); + verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback()); + + + } + + @Test + public void testEventTimeWindowsAreMergedEagerly() throws Exception { + testWindowsAreMergedEagerly(new EventTimeAdaptor()); + } + + @Test + public void testProcessingTimeWindowsAreMergedEagerly() throws Exception { + testWindowsAreMergedEagerly(new ProcessingTimeAdaptor()); + } + + /** + * Verify that windows are merged eagerly, if possible. + */ + public void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception { + // in this test we only have one state window and windows are eagerly + // merged into the first window + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // don't intefere with cleanup timers + timeAdaptor.registerTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.OnMergeContext context = (Trigger.OnMergeContext) invocation.getArguments()[1]; + // don't intefere with cleanup timers + timeAdaptor.registerTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onMerge(anyTimeWindow(), anyOnMergeContext()); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1]; + timeAdaptor.deleteTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).clear(); + return null; + } + }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext()); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 2))); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(3, testHarness.numKeyedStateEntries()); // window state plus trigger state plus merging window set + assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and GC timer + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4))); + + shouldMergeWindows( + mockAssigner, + Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)), + Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)), + new TimeWindow(0, 4)); + + // don't register a timer or update state in onElement, this checks + // whether onMerge does correctly set those things + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + verify(mockTrigger).onMerge(eq(new TimeWindow(0, 4)), anyOnMergeContext()); + + assertEquals(3, testHarness.numKeyedStateEntries()); + assertEquals(2, timeAdaptor.numTimers(testHarness)); + } + + @Test + public void testMergingOfExistingEventTimeWindows() throws Exception { + testMergingOfExistingWindows(new EventTimeAdaptor()); + } + + @Test + public void testMergingOfExistingProcessingTimeWindows() throws Exception { + testMergingOfExistingWindows(new ProcessingTimeAdaptor()); + } + + /** + * Verify that we only keep one of the underlying state windows. This test also verifies that + * GC timers are correctly deleted when merging windows. + */ + public void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception { + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // don't interfere with cleanup timers + timeAdaptor.registerTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.OnMergeContext context = (Trigger.OnMergeContext) invocation.getArguments()[1]; + // don't interfere with cleanup timers + timeAdaptor.registerTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onMerge(anyTimeWindow(), anyOnMergeContext()); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1]; + // don't interfere with cleanup timers + timeAdaptor.deleteTimer(context, 0L); + context.getPartitionedState(valueStateDescriptor).clear(); + return null; + } + }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext()); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 2))); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(3, testHarness.numKeyedStateEntries()); // window state plus trigger state plus merging window set + assertEquals(2, timeAdaptor.numTimers(testHarness)); // trigger timer plus GC timer + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4))); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(5, testHarness.numKeyedStateEntries()); // window state plus trigger state plus merging window set + assertEquals(4, timeAdaptor.numTimers(testHarness)); // trigger timer plus GC timer + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(1, 3))); + + shouldMergeWindows( + mockAssigner, + Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)), + Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)), + new TimeWindow(0, 4)); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(3, testHarness.numKeyedStateEntries()); // window contents plus trigger state plus merging window set + assertEquals(2, timeAdaptor.numTimers(testHarness)); // trigger timer plus GC timer + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + } + + @Test + public void testOnElementPurgeDoesNotCleanupMergingSet() throws Exception { + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + return TriggerResult.PURGE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(1, testHarness.numKeyedStateEntries()); // the merging window set + + assertEquals(1, testHarness.numEventTimeTimers()); // one cleanup timer + + assertEquals(0, testHarness.getOutput().size()); + } + + @Test + public void testOnEventTimePurgeDoesNotCleanupMergingSet() throws Exception { + testOnTimePurgeDoesNotCleanupMergingSet(new EventTimeAdaptor()); + } + + @Test + public void testOnProcessingTimePurgeDoesNotCleanupMergingSet() throws Exception { + testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor()); + } + + public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception { + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 4))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1L); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + timeAdaptor.shouldPurgeOnTime(mockTrigger); + + assertEquals(2, testHarness.numKeyedStateEntries()); // the merging window set + window contents + assertEquals(2, timeAdaptor.numTimers(testHarness)); // one cleanup timer + timer + assertEquals(0, testHarness.getOutput().size()); + + timeAdaptor.advanceTime(testHarness, 1L); + + assertEquals(1, testHarness.numKeyedStateEntries()); // the merging window set + assertEquals(1, timeAdaptor.numTimers(testHarness)); // one cleanup timer + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + } + + @Test + public void testNoEventTimeGarbageCollectionTimerForGlobalWindow() throws Exception { + testNoGarbageCollectionTimerForGlobalWindow(new EventTimeAdaptor()); + } + + @Test + public void testNoProcessingTimeGarbageCollectionTimerForGlobalWindow() throws Exception { + testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor()); + } + + public void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception { + + WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, GlobalWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, GlobalWindow> mockWindowFunction = mockWindowFunction(); + + // this needs to be true for the test to succeed + assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp()); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // just the window contents + assertEquals(1, testHarness.numKeyedStateEntries()); + + // verify we have no timers for either time domain + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + } + + @Test + public void testNoEventTimeGarbageCollectionTimerForLongMax() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, Long.MAX_VALUE - 10))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // just the window contents + assertEquals(1, testHarness.numKeyedStateEntries()); + + // no GC timer + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + } + + @Test + public void testProcessingTimeGarbageCollectionTimerIsAlwaysWindowMaxTimestamp() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + when(mockAssigner.isEventTime()).thenReturn(false); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, Long.MAX_VALUE - 10))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // just the window contents + assertEquals(1, testHarness.numKeyedStateEntries()); + + // no GC timer + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers()); + + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + testHarness.setProcessingTime(Long.MAX_VALUE - 10); + + verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext()); + + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + } + + @Test + public void testEventTimeGarbageCollectionTimer() throws Exception { + testGarbageCollectionTimer(new EventTimeAdaptor()); + } + + @Test + public void testProcessingTimeGarbageCollectionTimer() throws Exception { + testGarbageCollectionTimer(new ProcessingTimeAdaptor()); + } + + public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception { + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 20))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // just the window contents + assertEquals(1, testHarness.numKeyedStateEntries()); + + assertEquals(1, timeAdaptor.numTimers(testHarness)); + assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness)); + + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window + + verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext()); + + assertEquals(0, timeAdaptor.numTimers(testHarness)); + assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness)); + } + + @Test + public void testEventTimeTriggerTimerAndGarbageCollectionTimerCoincide() throws Exception { + testTriggerTimerAndGarbageCollectionTimerCoincide(new EventTimeAdaptor()); + } + + @Test + public void testProcessingTimeTriggerTimerAndGarbageCollectionTimerCoincide() throws Exception { + testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor()); + } + + public void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception { + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 20))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // 19 is maxTime of window + timeAdaptor.registerTimer(context, 19L); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // just the window contents + assertEquals(1, testHarness.numKeyedStateEntries()); + + assertEquals(1, timeAdaptor.numTimers(testHarness)); + assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness)); + + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + timeAdaptor.advanceTime(testHarness, 19); // 19 is maxTime of the window + + verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext()); + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + + assertEquals(0, timeAdaptor.numTimers(testHarness)); + assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness)); + } + + @Test + public void testStateAndTimerCleanupAtEventTimeGarbageCollection() throws Exception { + testStateAndTimerCleanupAtEventTimeGarbageCollection(new EventTimeAdaptor()); + } + + @Test + public void testStateAndTimerCleanupAtProcessingTimeGarbageCollection() throws Exception { + testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor()); + } + + public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception { + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 20))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // very far in the future so our watermark does not trigger it + timeAdaptor.registerTimer(context, 1000L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1]; + timeAdaptor.deleteTimer(context, 1000L); + context.getPartitionedState(valueStateDescriptor).clear(); + return null; + } + }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(2, timeAdaptor.numTimers(testHarness)); // window timers/gc timers + assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness)); + + timeAdaptor.advanceTime(testHarness, 19 + 20); + + verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext()); + + assertEquals(0, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(0, timeAdaptor.numTimers(testHarness)); // window timers/gc timers + assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness)); + } + + @Test + public void testStateAndTimerCleanupAtEventTimeGarbageCollectionWithPurgingTrigger() throws Exception { + testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(new EventTimeAdaptor()); + } + + @Test + public void testStateAndTimerCleanupAtProcessingTimeGarbageCollectionWithPurgingTrigger() throws Exception { + testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(new ProcessingTimeAdaptor()); + } + + /** + * Verify that we correctly clean up even when a purging trigger has purged + * window state. + */ + public void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception { + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 20))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // very far in the future so our watermark does not trigger it + timeAdaptor.registerTimer(context, 1000L); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.PURGE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[1]; + timeAdaptor.deleteTimer(context, 1000L); + context.getPartitionedState(valueStateDescriptor).clear(); + return null; + } + }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + assertEquals(1, testHarness.numKeyedStateEntries()); // just the trigger state remains + assertEquals(2, timeAdaptor.numTimers(testHarness)); // window timers/gc timers + assertEquals(0, timeAdaptor.numTimersOtherDomain(testHarness)); + + timeAdaptor.advanceTime(testHarness, 19 + 20);
<TRUNCATED>
