Repository: flink Updated Branches: refs/heads/master 5c37e55c8 -> 662ed33d8
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java index d4fefa2..6f34607 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java @@ -21,9 +21,11 @@ package org.apache.flink.streaming.api.operators.windowing.functions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction; @@ -31,6 +33,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction; @@ -41,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.util.Collector; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Test; @@ -56,7 +60,11 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.collection.IsMapContaining.hasEntry; import static org.hamcrest.core.AllOf.allOf; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyObject; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class InternalWindowFunctionTest { @@ -93,7 +101,9 @@ public class InternalWindowFunctionTest { Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); - windowFunction.apply(((byte)0), w, i, c); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); + + windowFunction.process(((byte)0), w, ctx, i, c); verify(mock).apply(w, i, c); // check close @@ -134,7 +144,8 @@ public class InternalWindowFunctionTest { Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); - windowFunction.apply(((byte)0), w, i, c); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); + windowFunction.process(((byte)0), w, ctx, i, c); verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c)); // check close @@ -175,7 +186,8 @@ public class InternalWindowFunctionTest { Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); - windowFunction.apply(42L, w, i, c); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); + windowFunction.process(42L, w, ctx, i, c); verify(mock).apply(eq(42L), eq(w), eq(i), eq(c)); // check close @@ -215,8 +227,9 @@ public class InternalWindowFunctionTest { TimeWindow w = mock(TimeWindow.class); Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.apply(42L, w, i, c); + windowFunction.process(42L, w, ctx, i, c); verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c)); // check close @@ -256,8 +269,9 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); Collector<String> c = (Collector<String>) mock(Collector.class); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.apply(42L, w, 23L, c); + windowFunction.process(42L, w, ctx, 23L, c); verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); // check close @@ -297,8 +311,9 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); Collector<String> c = (Collector<String>) mock(Collector.class); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.apply(((byte)0), w, 23L, c); + windowFunction.process(((byte)0), w, ctx, 23L, c); verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); // check close @@ -338,8 +353,9 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); Collector<String> c = (Collector<String>) mock(Collector.class); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.apply(((byte)0), w, 23L, c); + windowFunction.process(((byte)0), w, ctx, 23L, c); verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); // check close @@ -378,8 +394,9 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); Collector<String> c = (Collector<String>) mock(Collector.class); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.apply(42L, w, 23L, c); + windowFunction.process(42L, w, ctx,23L, c); verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); // check close @@ -450,8 +467,9 @@ public class InternalWindowFunctionTest { List<Long> args = new LinkedList<>(); args.add(23L); args.add(24L); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.apply(42L, w, args, c); + windowFunction.process(42L, w, ctx, args, c); verify(mock).process( eq(42L), (AggregateProcessWindowFunctionMock.Context) anyObject(), @@ -528,8 +546,9 @@ public class InternalWindowFunctionTest { List<Long> args = new LinkedList<>(); args.add(23L); args.add(24L); + InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.apply(((byte)0), w, args, c); + windowFunction.process(((byte)0), w, ctx, args, c); verify(mock).process( (AggregateProcessAllWindowFunctionMock.Context) anyObject(), (Iterable) argThat(containsInAnyOrder(allOf( @@ -552,7 +571,9 @@ public class InternalWindowFunctionTest { public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { } @Override - public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { } + public void process(Long aLong, ProcessWindowFunction<Long, String, Long, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception { + + } } public static class AggregateProcessWindowFunctionMock @@ -565,7 +586,9 @@ public class InternalWindowFunctionTest { public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { } @Override - public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { } + public void process(Long aLong, ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>.Context context, Iterable<Map<Long, Long>> elements, Collector<String> out) throws Exception { + + } } public static class AggregateProcessAllWindowFunctionMock http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java index 11508c5..ff1cbdf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java @@ -118,9 +118,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes testHarness.processElement(new StreamRecord<>(1, 0L)); - verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector()); // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); @@ -182,9 +182,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes testHarness.processElement(new StreamRecord<>(1, 0L)); - verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector()); // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index 8aae46a..faab505 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -163,6 +163,10 @@ public abstract class WindowOperatorContractTest extends TestLogger { return Mockito.any(); } + static InternalWindowFunction.InternalWindowContext anyInternalWindowContext() { + return Mockito.any(); + } + static Trigger.OnMergeContext anyOnMergeContext() { return Mockito.any(); } @@ -408,9 +412,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq((new TimeWindow(0, 2))), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); } @Test @@ -455,9 +459,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockWindowFunction, times(2)).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 1)), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 1)), anyInternalWindowContext(), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector()); } @Test @@ -509,16 +513,16 @@ public abstract class WindowOperatorContractTest extends TestLogger { @Override public Void answer(InvocationOnMock invocation) throws Exception { @SuppressWarnings("unchecked") - Collector<String> out = invocation.getArgumentAt(3, Collector.class); + Collector<String> out = invocation.getArgumentAt(4, Collector.class); out.collect("Hallo"); out.collect("Ciao"); return null; } - }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); assertThat(testHarness.extractOutputStreamRecords(), contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); @@ -553,25 +557,25 @@ public abstract class WindowOperatorContractTest extends TestLogger { @Override public Void answer(InvocationOnMock invocation) throws Exception { @SuppressWarnings("unchecked") - Collector<String> out = invocation.getArgumentAt(3, Collector.class); + Collector<String> out = invocation.getArgumentAt(4, Collector.class); out.collect("Hallo"); out.collect("Ciao"); return null; } - }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1); testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockWindowFunction, never()).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector()); + verify(mockWindowFunction, never()).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector()); assertTrue(testHarness.extractOutputStreamRecords().isEmpty()); timeAdaptor.shouldFireOnTime(mockTrigger); timeAdaptor.advanceTime(testHarness, 1L); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); assertThat(testHarness.extractOutputStreamRecords(), contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); @@ -650,9 +654,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); @@ -693,9 +697,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); @@ -858,9 +862,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { timeAdaptor.advanceTime(testHarness, 0L); - verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); @@ -919,9 +923,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { timeAdaptor.advanceTime(testHarness, 0L); - verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); @@ -1050,7 +1054,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null); verify(mockWindowFunction, never()) - .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); + .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left } @@ -1114,7 +1118,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null); verify(mockWindowFunction, never()) - .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); + .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left } @@ -1186,7 +1190,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); verify(mockWindowFunction, never()) - .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); + .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); // now we trigger the dangling timer timeAdaptor.advanceTime(testHarness, 10L); @@ -2208,7 +2212,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); @@ -2326,9 +2330,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { verify(mockTrigger, times(2)).clear(anyTimeWindow(), anyTriggerContext()); - verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); // it's also called for the cleanup timers verify(mockTrigger, times(4)).onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext()); @@ -2339,6 +2343,96 @@ public abstract class WindowOperatorContractTest extends TestLogger { assertEquals(0, testHarness.numEventTimeTimers()); } + @Test + public void testPerWindowStateSetAndClearedOnEventTimePurge() throws Exception { + testPerWindowStateSetAndClearedOnPurge(new EventTimeAdaptor()); + } + + @Test + public void testPerWindowStateSetAndClearedOnProcessingTimePurge() throws Exception { + testPerWindowStateSetAndClearedOnPurge(new ProcessingTimeAdaptor()); + } + + public void testPerWindowStateSetAndClearedOnPurge(TimeDomainAdaptor timeAdaptor) throws Exception { + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); + + testHarness.open(); + + when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext())) + .thenReturn(TriggerResult.FIRE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 20))); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; + context.windowState().getState(valueStateDescriptor).update("hello"); + return null; + } + }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1]; + context.windowState().getState(valueStateDescriptor).clear(); + return null; + } + }).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext()); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus value state + assertEquals(1, timeAdaptor.numTimers(testHarness)); // gc timers + + timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window + + assertEquals(0, testHarness.numKeyedStateEntries()); + assertEquals(0, timeAdaptor.numTimers(testHarness)); + } + + @Test + public void testWindowStateNotAvailableToMergingWindows() throws Exception { + WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); + + testHarness.open(); + + when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext())) + .thenReturn(TriggerResult.FIRE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(0, 20))); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; + context.windowState().getState(valueStateDescriptor).update("hello"); + return null; + } + }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); + + expectedException.expect(UnsupportedOperationException.class); + expectedException.expectMessage("Per-window state is not allowed when using merging windows."); + testHarness.processElement(new StreamRecord<>(0, 0L)); + } + protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( WindowAssigner<Integer, W> assigner, Trigger<Integer, W> trigger, http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala index 163117b..2f0e48e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala @@ -22,6 +22,7 @@ import java.io.Serializable import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.functions.Function +import org.apache.flink.api.common.state.KeyedStateStore import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -47,6 +48,15 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w def process(context: Context, elements: Iterable[IN], out: Collector[OUT]) /** + * Deletes any state in the [[Context]] when the Window is purged. + * + * @param context The context to which the window is being evaluated + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + @throws[Exception] + def clear(context: Context) {} + + /** * The context holding window metadata */ abstract class Context { @@ -54,6 +64,16 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w * @return The window that is being evaluated. */ def window: W + + /** + * State accessor for per-key and per-window state. + */ + def windowState: KeyedStateStore + + /** + * State accessor for per-key global state. + */ + def globalState: KeyedStateStore } } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala index 79f3918..bdf6ae6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala @@ -22,6 +22,7 @@ import java.io.Serializable import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.functions.Function +import org.apache.flink.api.common.state.KeyedStateStore import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -49,6 +50,15 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]) /** + * Deletes any state in the [[Context]] when the Window is purged. + * + * @param context The context to which the window is being evaluated + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + @throws[Exception] + def clear(context: Context) {} + + /** * The context holding window metadata */ abstract class Context { @@ -56,6 +66,16 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function * @return The window that is being evaluated. */ def window: W + + /** + * State accessor for per-key and per-window state. + */ + def windowState: KeyedStateStore + + /** + * State accessor for per-key global state. + */ + def globalState: KeyedStateStore } } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala index a4fec64..fac5958 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -52,10 +52,25 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( out: Collector[OUT]): Unit = { val ctx = new func.Context { override def window = context.window + + override def windowState = context.windowState() + + override def globalState = context.globalState() } func.process(key, ctx, elements.asScala, out) } + override def clear(context: JProcessWindowFunction[IN, OUT, KEY, W]#Context): Unit = { + val ctx = new func.Context { + override def window = context.window + + override def windowState = context.windowState() + + override def globalState = context.globalState() + } + func.clear(ctx) + } + override def setRuntimeContext(t: RuntimeContext): Unit = { super.setRuntimeContext(t) func match { @@ -99,10 +114,26 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( out: Collector[OUT]): Unit = { val ctx = new func.Context { override def window = context.window + + override def windowState = context.windowState() + + override def globalState = context.globalState() } func.process(ctx, elements.asScala, out) } + override def clear(context: JProcessAllWindowFunction[IN, OUT, W]#Context): Unit = { + val ctx = new func.Context { + override def window = context.window + + override def windowState = context.windowState() + + override def globalState = context.globalState() + } + func.clear(ctx) + } + + override def setRuntimeContext(t: RuntimeContext): Unit = { super.setRuntimeContext(t) func match {
