Repository: flink
Updated Branches:
  refs/heads/master 5c37e55c8 -> 662ed33d8


http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index d4fefa2..6f34607 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -21,9 +21,11 @@ package 
org.apache.flink.streaming.api.operators.windowing.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
@@ -31,6 +33,7 @@ import 
org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
@@ -41,6 +44,7 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Test;
@@ -56,7 +60,11 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.collection.IsMapContaining.hasEntry;
 import static org.hamcrest.core.AllOf.allOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class InternalWindowFunctionTest {
 
@@ -93,7 +101,9 @@ public class InternalWindowFunctionTest {
                Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
 
-               windowFunction.apply(((byte)0), w, i, c);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
+
+               windowFunction.process(((byte)0), w, ctx, i, c);
                verify(mock).apply(w, i, c);
 
                // check close
@@ -134,7 +144,8 @@ public class InternalWindowFunctionTest {
                Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
 
-               windowFunction.apply(((byte)0), w, i, c);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
+               windowFunction.process(((byte)0), w, ctx, i, c);
                verify(mock).process((ProcessAllWindowFunctionMock.Context) 
anyObject(), eq(i), eq(c));
 
                // check close
@@ -175,7 +186,8 @@ public class InternalWindowFunctionTest {
                Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
 
-               windowFunction.apply(42L, w, i, c);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
+               windowFunction.process(42L, w, ctx, i, c);
                verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
 
                // check close
@@ -215,8 +227,9 @@ public class InternalWindowFunctionTest {
                TimeWindow w = mock(TimeWindow.class);
                Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
 
-               windowFunction.apply(42L, w, i, c);
+               windowFunction.process(42L, w, ctx, i, c);
                verify(mock).process(eq(42L), 
(ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
                // check close
@@ -256,8 +269,9 @@ public class InternalWindowFunctionTest {
                // check apply
                TimeWindow w = mock(TimeWindow.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
 
-               windowFunction.apply(42L, w, 23L, c);
+               windowFunction.process(42L, w, ctx, 23L, c);
                verify(mock).apply(eq(42L), eq(w), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
                // check close
@@ -297,8 +311,9 @@ public class InternalWindowFunctionTest {
                // check apply
                TimeWindow w = mock(TimeWindow.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
 
-               windowFunction.apply(((byte)0), w, 23L, c);
+               windowFunction.process(((byte)0), w, ctx, 23L, c);
                verify(mock).apply(eq(w), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
                // check close
@@ -338,8 +353,9 @@ public class InternalWindowFunctionTest {
                // check apply
                TimeWindow w = mock(TimeWindow.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
 
-               windowFunction.apply(((byte)0), w, 23L, c);
+               windowFunction.process(((byte)0), w, ctx, 23L, c);
                verify(mock).process((ProcessAllWindowFunctionMock.Context) 
anyObject(), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
                // check close
@@ -378,8 +394,9 @@ public class InternalWindowFunctionTest {
                // check apply
                TimeWindow w = mock(TimeWindow.class);
                Collector<String> c = (Collector<String>) mock(Collector.class);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
 
-               windowFunction.apply(42L, w, 23L, c);
+               windowFunction.process(42L, w, ctx,23L, c);
                verify(mock).process(eq(42L), 
(ProcessWindowFunctionMock.Context) anyObject(), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
                // check close
@@ -450,8 +467,9 @@ public class InternalWindowFunctionTest {
                List<Long> args = new LinkedList<>();
                args.add(23L);
                args.add(24L);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
 
-               windowFunction.apply(42L, w, args, c);
+               windowFunction.process(42L, w, ctx, args, c);
                verify(mock).process(
                                eq(42L),
                                (AggregateProcessWindowFunctionMock.Context) 
anyObject(),
@@ -528,8 +546,9 @@ public class InternalWindowFunctionTest {
                List<Long> args = new LinkedList<>();
                args.add(23L);
                args.add(24L);
+               InternalWindowFunction.InternalWindowContext ctx = 
mock(InternalWindowFunction.InternalWindowContext.class);
 
-               windowFunction.apply(((byte)0), w, args, c);
+               windowFunction.process(((byte)0), w, ctx, args, c);
                verify(mock).process(
                                (AggregateProcessAllWindowFunctionMock.Context) 
anyObject(),
                                (Iterable) argThat(containsInAnyOrder(allOf(
@@ -552,7 +571,9 @@ public class InternalWindowFunctionTest {
                public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
 
                @Override
-               public void process(Long aLong, Context context, Iterable<Long> 
input, Collector<String> out) throws Exception { }
+               public void process(Long aLong, ProcessWindowFunction<Long, 
String, Long, TimeWindow>.Context context, Iterable<Long> elements, 
Collector<String> out) throws Exception {
+
+               }
        }
 
        public static class AggregateProcessWindowFunctionMock
@@ -565,7 +586,9 @@ public class InternalWindowFunctionTest {
                public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
 
                @Override
-               public void process(Long aLong, Context context, 
Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+               public void process(Long aLong, ProcessWindowFunction<Map<Long, 
Long>, String, Long, TimeWindow>.Context context, Iterable<Map<Long, Long>> 
elements, Collector<String> out) throws Exception {
+
+               }
        }
 
        public static class AggregateProcessAllWindowFunctionMock

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index 11508c5..ff1cbdf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -118,9 +118,9 @@ public class RegularWindowOperatorContractTest extends 
WindowOperatorContractTes
 
                testHarness.processElement(new StreamRecord<>(1, 0L));
 
-               verify(mockWindowFunction, times(2)).apply(eq(1), 
anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(1), 
anyTimeWindow(), anyInternalWindowContext(), anyInt(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(1), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(1), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // clear is only called at cleanup time/GC time
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
@@ -182,9 +182,9 @@ public class RegularWindowOperatorContractTest extends 
WindowOperatorContractTes
 
                testHarness.processElement(new StreamRecord<>(1, 0L));
 
-               verify(mockWindowFunction, times(2)).apply(eq(1), 
anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(1), 
anyTimeWindow(), anyInternalWindowContext(), anyInt(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(1), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(1), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // clear is only called at cleanup time/GC time
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 8aae46a..faab505 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -163,6 +163,10 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                return Mockito.any();
        }
 
+       static InternalWindowFunction.InternalWindowContext 
anyInternalWindowContext() {
+               return Mockito.any();
+       }
+
        static Trigger.OnMergeContext anyOnMergeContext() {
                return Mockito.any();
        }
@@ -408,9 +412,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockWindowFunction, times(2)).apply(eq(0), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(2, 4)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(0), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq((new 
TimeWindow(0, 2))), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
        }
 
        @Test
@@ -455,9 +459,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockWindowFunction, times(2)).apply(anyInt(), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0, 0), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(0, 1)), intIterable(1, 1), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(anyInt(), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0, 0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(1), eq(new 
TimeWindow(0, 1)), anyInternalWindowContext(), intIterable(1, 1), 
WindowOperatorContractTest.<Void>anyCollector());
        }
 
        @Test
@@ -509,16 +513,16 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                        @Override
                        public Void answer(InvocationOnMock invocation) throws 
Exception {
                                @SuppressWarnings("unchecked")
-                               Collector<String> out = 
invocation.getArgumentAt(3, Collector.class);
+                               Collector<String> out = 
invocation.getArgumentAt(4, Collector.class);
                                out.collect("Hallo");
                                out.collect("Ciao");
                                return null;
                        }
-               }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 
2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+               }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 
2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
 
                assertThat(testHarness.extractOutputStreamRecords(),
                                contains(isStreamRecord("Hallo", 1L), 
isStreamRecord("Ciao", 1L)));
@@ -553,25 +557,25 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                        @Override
                        public Void answer(InvocationOnMock invocation) throws 
Exception {
                                @SuppressWarnings("unchecked")
-                               Collector<String> out = 
invocation.getArgumentAt(3, Collector.class);
+                               Collector<String> out = 
invocation.getArgumentAt(4, Collector.class);
                                out.collect("Hallo");
                                out.collect("Ciao");
                                return null;
                        }
-               }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 
2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+               }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 
2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
 
                timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockWindowFunction, never()).apply(anyInt(), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<String>anyCollector());
+               verify(mockWindowFunction, never()).process(anyInt(), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<String>anyCollector());
                assertTrue(testHarness.extractOutputStreamRecords().isEmpty());
 
                timeAdaptor.shouldFireOnTime(mockTrigger);
 
                timeAdaptor.advanceTime(testHarness, 1L);
 
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
 
                assertThat(testHarness.extractOutputStreamRecords(),
                                contains(isStreamRecord("Hallo", 1L), 
isStreamRecord("Ciao", 1L)));
@@ -650,9 +654,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockWindowFunction, times(2)).apply(eq(0), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(2, 4)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(0), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // clear is only called at cleanup time/GC time
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
@@ -693,9 +697,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockWindowFunction, times(2)).apply(eq(0), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(2, 4)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(0), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // clear is only called at cleanup time/GC time
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
@@ -858,9 +862,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                timeAdaptor.advanceTime(testHarness, 0L);
 
-               verify(mockWindowFunction, times(2)).apply(eq(0), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(2, 4)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(0), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // clear is only called at cleanup time/GC time
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
@@ -919,9 +923,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                timeAdaptor.advanceTime(testHarness, 0L);
 
-               verify(mockWindowFunction, times(2)).apply(eq(0), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(2, 4)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(0), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // clear is only called at cleanup time/GC time
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
@@ -1050,7 +1054,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, 
null);
 
                verify(mockWindowFunction, never())
-                               .apply(anyInt(), anyTimeWindow(), 
anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+                               .process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<List<Integer>>anyCollector());
 
                assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc 
timers left
        }
@@ -1114,7 +1118,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, 
null);
 
                verify(mockWindowFunction, never())
-                               .apply(anyInt(), anyTimeWindow(), 
anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+                               .process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<List<Integer>>anyCollector());
 
                assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc 
timers left
        }
@@ -1186,7 +1190,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, 
null);
 
                verify(mockWindowFunction, never())
-                               .apply(anyInt(), anyTimeWindow(), 
anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+                               .process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<List<Integer>>anyCollector());
 
                // now we trigger the dangling timer
                timeAdaptor.advanceTime(testHarness, 10L);
@@ -2208,7 +2212,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // clear is only called at cleanup time/GC time
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
@@ -2326,9 +2330,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                verify(mockTrigger, times(2)).clear(anyTimeWindow(), 
anyTriggerContext());
 
-               verify(mockWindowFunction, times(2)).apply(eq(0), 
anyTimeWindow(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(2, 4)), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(2)).process(eq(0), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
 
                // it's also called for the cleanup timers
                verify(mockTrigger, times(4)).onEventTime(anyLong(), 
anyTimeWindow(), anyTriggerContext());
@@ -2339,6 +2343,96 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                assertEquals(0, testHarness.numEventTimeTimers());
        }
 
+       @Test
+       public void testPerWindowStateSetAndClearedOnEventTimePurge() throws 
Exception {
+               testPerWindowStateSetAndClearedOnPurge(new EventTimeAdaptor());
+       }
+
+       @Test
+       public void testPerWindowStateSetAndClearedOnProcessingTimePurge() 
throws Exception {
+               testPerWindowStateSetAndClearedOnPurge(new 
ProcessingTimeAdaptor());
+       }
+
+       public void testPerWindowStateSetAndClearedOnPurge(TimeDomainAdaptor 
timeAdaptor) throws Exception {
+               WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
+               timeAdaptor.setIsEventTime(mockAssigner);
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+               InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
+                       createWindowOperator(mockAssigner, mockTrigger, 20L, 
mockWindowFunction);
+
+               testHarness.open();
+
+               when(mockTrigger.onElement(anyInt(), anyLong(), 
anyTimeWindow(), anyTriggerContext()))
+                       .thenReturn(TriggerResult.FIRE);
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                       .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+               doAnswer(new Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+                               
context.windowState().getState(valueStateDescriptor).update("hello");
+                               return null;
+                       }
+               }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+
+               doAnswer(new Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+                               
context.windowState().getState(valueStateDescriptor).clear();
+                               return null;
+                       }
+               }).when(mockWindowFunction).clear(anyTimeWindow(), 
anyInternalWindowContext());
+
+               assertEquals(0, testHarness.getOutput().size());
+               assertEquals(0, testHarness.numKeyedStateEntries());
+
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+
+               assertEquals(2, testHarness.numKeyedStateEntries()); // window 
contents plus value state
+               assertEquals(1, timeAdaptor.numTimers(testHarness)); // gc 
timers
+
+               timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime 
of the window
+
+               assertEquals(0, testHarness.numKeyedStateEntries());
+               assertEquals(0, timeAdaptor.numTimers(testHarness));
+       }
+
+       @Test
+       public void testWindowStateNotAvailableToMergingWindows() throws 
Exception {
+               WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+               InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
+                       createWindowOperator(mockAssigner, mockTrigger, 20L, 
mockWindowFunction);
+
+               testHarness.open();
+
+               when(mockTrigger.onElement(anyInt(), anyLong(), 
anyTimeWindow(), anyTriggerContext()))
+                       .thenReturn(TriggerResult.FIRE);
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                       .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+               doAnswer(new Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+                               
context.windowState().getState(valueStateDescriptor).update("hello");
+                               return null;
+                       }
+               }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
+
+               expectedException.expect(UnsupportedOperationException.class);
+               expectedException.expectMessage("Per-window state is not 
allowed when using merging windows.");
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+       }
+
        protected abstract <W extends Window, OUT> 
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> 
createWindowOperator(
                        WindowAssigner<Integer, W> assigner,
                        Trigger<Integer, W> trigger,

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 163117b..2f0e48e 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -47,6 +48,15 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: 
Window] extends Function w
   def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
 
   /**
+    * Deletes any state in the [[Context]] when the Window is purged.
+    *
+    * @param context The context to which the window is being evaluated
+    * @throws Exception The function may throw exceptions to fail the program 
and trigger recovery.
+    */
+  @throws[Exception]
+  def clear(context: Context) {}
+
+  /**
     * The context holding window metadata
     */
   abstract class Context {
@@ -54,6 +64,16 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: 
Window] extends Function w
       * @return The window that is being evaluated.
       */
     def window: W
+
+    /**
+      * State accessor for per-key and per-window state.
+      */
+    def windowState: KeyedStateStore
+
+    /**
+      * State accessor for per-key global state.
+      */
+    def globalState: KeyedStateStore
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index 79f3918..bdf6ae6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -49,6 +50,15 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: 
Window] extends Function
   def process(key: KEY, context: Context, elements: Iterable[IN], out: 
Collector[OUT])
 
   /**
+    * Deletes any state in the [[Context]] when the Window is purged.
+    *
+    * @param context The context to which the window is being evaluated
+    * @throws Exception The function may throw exceptions to fail the program 
and trigger recovery.
+    */
+  @throws[Exception]
+  def clear(context: Context) {}
+
+  /**
     * The context holding window metadata
     */
   abstract class Context {
@@ -56,6 +66,16 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: 
Window] extends Function
       * @return The window that is being evaluated.
       */
     def window: W
+
+    /**
+      * State accessor for per-key and per-window state.
+      */
+    def windowState: KeyedStateStore
+
+    /**
+      * State accessor for per-key global state.
+      */
+    def globalState: KeyedStateStore
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index a4fec64..fac5958 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -52,10 +52,25 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, 
W <: Window](
       out: Collector[OUT]): Unit = {
     val ctx = new func.Context {
       override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
     }
     func.process(key, ctx, elements.asScala, out)
   }
 
+  override def clear(context: JProcessWindowFunction[IN, OUT, KEY, 
W]#Context): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
+    }
+    func.clear(ctx)
+  }
+
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {
@@ -99,10 +114,26 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, 
W <: Window](
       out: Collector[OUT]): Unit = {
     val ctx = new func.Context {
       override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
     }
     func.process(ctx, elements.asScala, out)
   }
 
+  override def clear(context: JProcessAllWindowFunction[IN, OUT, W]#Context): 
Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
+    }
+    func.clear(ctx)
+  }
+
+
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {

Reply via email to