[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fad201bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fad201bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fad201bf

Branch: refs/heads/table-retraction
Commit: fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8
Parents: 5c37e55
Author: Seth Wiesman <[email protected]>
Authored: Sun Mar 5 23:07:18 2017 -0500
Committer: Aljoscha Krettek <[email protected]>
Committed: Sat Mar 25 16:59:17 2017 +0100

----------------------------------------------------------------------
 .../FoldApplyProcessAllWindowFunction.java      |  23 +-
 .../FoldApplyProcessWindowFunction.java         |  23 +-
 .../InternalProcessApplyAllWindowContext.java   |  57 +++++
 .../InternalProcessApplyWindowContext.java      |  58 +++++
 .../windowing/ProcessAllWindowFunction.java     |  22 ++
 .../windowing/ProcessWindowFunction.java        |  24 +-
 .../ReduceApplyProcessAllWindowFunction.java    |  23 +-
 .../ReduceApplyProcessWindowFunction.java       |  21 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |  75 ++++++-
 .../windowing/EvictingWindowOperator.java       |  62 +++---
 .../operators/windowing/WindowOperator.java     | 220 +++++++++++++++----
 ...ternalAggregateProcessAllWindowFunction.java |  28 ++-
 .../InternalAggregateProcessWindowFunction.java |  28 ++-
 .../InternalIterableAllWindowFunction.java      |   7 +-
 ...nternalIterableProcessAllWindowFunction.java |  31 ++-
 .../InternalIterableProcessWindowFunction.java  |  24 +-
 .../InternalIterableWindowFunction.java         |   7 +-
 .../InternalProcessAllWindowContext.java        |  57 +++++
 .../functions/InternalProcessWindowContext.java |  58 +++++
 .../InternalSingleValueAllWindowFunction.java   |   7 +-
 ...rnalSingleValueProcessAllWindowFunction.java |  29 ++-
 ...nternalSingleValueProcessWindowFunction.java |  24 +-
 .../InternalSingleValueWindowFunction.java      |   7 +-
 .../functions/InternalWindowFunction.java       |  26 ++-
 .../FoldApplyProcessWindowFunctionTest.java     |  82 ++++++-
 .../functions/InternalWindowFunctionTest.java   |  49 +++--
 .../RegularWindowOperatorContractTest.java      |  12 +-
 .../windowing/WindowOperatorContractTest.java   | 158 ++++++++++---
 .../function/ProcessAllWindowFunction.scala     |  20 ++
 .../scala/function/ProcessWindowFunction.scala  |  20 ++
 .../ScalaProcessWindowFunctionWrapper.scala     |  31 +++
 31 files changed, 1091 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 5ac6766..8e8e52c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessAllWindowFunction<W extends 
Window, T, ACC, R>
        private TypeSerializer<ACC> accSerializer;
        private final TypeInformation<ACC> accTypeInformation;
        private transient ACC initialValue;
+       private transient InternalProcessApplyAllWindowContext<ACC, R, W> ctx;
 
        public FoldApplyProcessAllWindowFunction(ACC initialValue, 
FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> 
windowFunction, TypeInformation<ACC> accTypeInformation) {
                this.windowFunction = windowFunction;
@@ -70,6 +71,9 @@ public class FoldApplyProcessAllWindowFunction<W extends 
Window, T, ACC, R>
                ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
                DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
                initialValue = accSerializer.deserialize(in);
+
+               ctx = new 
InternalProcessApplyAllWindowContext<>(windowFunction);
+
        }
 
        @Override
@@ -92,12 +96,19 @@ public class FoldApplyProcessAllWindowFunction<W extends 
Window, T, ACC, R>
                        result = foldFunction.fold(result, val);
                }
 
-               windowFunction.process(windowFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return context.window();
-                       }
-               }, Collections.singletonList(result), out);
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+
+               windowFunction.process(ctx, Collections.singletonList(result), 
out);
+       }
+
+       @Override
+       public void clear(final Context context) throws Exception {
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+               windowFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index e1bc759..073a2f3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessWindowFunction<K, W extends 
Window, T, ACC, R>
        private TypeSerializer<ACC> accSerializer;
        private final TypeInformation<ACC> accTypeInformation;
        private transient ACC initialValue;
+       private transient InternalProcessApplyWindowContext<ACC, R, K, W> ctx;
 
        public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, 
ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, 
TypeInformation<ACC> accTypeInformation) {
                this.windowFunction = windowFunction;
@@ -70,6 +71,8 @@ public class FoldApplyProcessWindowFunction<K, W extends 
Window, T, ACC, R>
                ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
                DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
                initialValue = accSerializer.deserialize(in);
+
+               ctx = new InternalProcessApplyWindowContext<>(windowFunction);
        }
 
        @Override
@@ -85,19 +88,25 @@ public class FoldApplyProcessWindowFunction<K, W extends 
Window, T, ACC, R>
        }
 
        @Override
-       public void process(K key, final Context context, Iterable<T> values, 
Collector<R> out) throws Exception {
+       public void process(K key, Context context, Iterable<T> values, 
Collector<R> out) throws Exception {
                ACC result = accSerializer.copy(initialValue);
 
                for (T val : values) {
                        result = foldFunction.fold(result, val);
                }
 
-               windowFunction.process(key, windowFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return context.window();
-                       }
-               }, Collections.singletonList(result), out);
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+               windowFunction.process(key, ctx, 
Collections.singletonList(result), out);
+       }
+
+       @Override
+       public void clear(final Context context) throws Exception{
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+               windowFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
new file mode 100644
index 0000000..e1a0a98
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window>
+       extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+       W window;
+       KeyedStateStore windowState;
+       KeyedStateStore globalState;
+
+       InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, 
W> function) {
+               function.super();
+       }
+
+       @Override
+       public W window() {
+               return window;
+       }
+
+       @Override
+       public KeyedStateStore windowState() {
+               return windowState;
+       }
+
+       @Override
+       public KeyedStateStore globalState() {
+               return globalState;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
new file mode 100644
index 0000000..f547adc
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the window key. 
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window>
+       extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+       W window;
+       KeyedStateStore windowState;
+       KeyedStateStore globalState;
+
+       InternalProcessApplyWindowContext(ProcessWindowFunction<IN, OUT, KEY, 
W> function) {
+               function.super();
+       }
+
+       @Override
+       public W window() {
+               return window;
+       }
+
+       @Override
+       public KeyedStateStore windowState() {
+               return windowState;
+       }
+
+       @Override
+       public KeyedStateStore globalState() {
+               return globalState;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 622e020..f49aa27 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -48,6 +49,14 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W 
extends Window> implem
        public abstract void process(Context context, Iterable<IN> elements, 
Collector<OUT> out) throws Exception;
 
        /**
+        * Deletes any state in the {@code Context} when the Window is purged.
+        *
+        * @param context The context to which the window is being evaluated
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
+        */
+       public void clear(Context context) throws Exception {}
+
+       /**
         * The context holding window metadata
         */
        public abstract class Context {
@@ -55,5 +64,18 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W 
extends Window> implem
                 * @return The window that is being evaluated.
                 */
                public abstract W window();
+
+               /**
+                * State accessor for per-key and per-window state.
+                *
+                * <p><b>NOTE:</b>If you use per-window state you have to 
ensure that you clean it up
+                * by implementing {@link 
ProcessWindowFunction#clear(ProcessWindowFunction.Context)}.
+                */
+               public abstract KeyedStateStore windowState();
+
+               /**
+                * State accessor for per-key global state.
+                */
+               public abstract KeyedStateStore globalState();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 9c48e24..bcefaf7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -50,12 +51,33 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W 
extends Window> impl
        public abstract void process(KEY key, Context context, Iterable<IN> 
elements, Collector<OUT> out) throws Exception;
 
        /**
+        * Deletes any state in the {@code Context} when the Window is purged.
+        *
+        * @param context The context to which the window is being evaluated
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
+        */
+       public void clear(Context context) throws Exception {}
+
+       /**
         * The context holding window metadata
         */
-       public abstract class Context {
+       public abstract class Context implements java.io.Serializable {
                /**
                 * @return The window that is being evaluated.
                 */
                public abstract W window();
+
+               /**
+                * State accessor for per-key and per-window state.
+                *
+                * <p><b>NOTE:</b>If you use per-window state you have to 
ensure that you clean it up
+                * by implementing {@link ProcessWindowFunction#clear(Context)}.
+                */
+               public abstract KeyedStateStore windowState();
+
+               /**
+                * State accessor for per-key global state.
+                */
+               public abstract KeyedStateStore globalState();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index 142c71e..4c54c94 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends 
Window, T, R>
 
        private final ReduceFunction<T> reduceFunction;
        private final ProcessAllWindowFunction<T, R, W> windowFunction;
+       private transient InternalProcessApplyAllWindowContext<T, R, W> ctx;
 
        public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> 
reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
                this.windowFunction = windowFunction;
@@ -52,17 +53,27 @@ public class ReduceApplyProcessAllWindowFunction<W extends 
Window, T, R>
                                curr = reduceFunction.reduce(curr, val);
                        }
                }
-               windowFunction.process(windowFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return context.window();
-                       }
-               }, Collections.singletonList(curr), out);
+
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+
+               windowFunction.process(ctx, Collections.singletonList(curr), 
out);
+       }
+
+       @Override
+       public void clear(final Context context) throws Exception {
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+
+               windowFunction.clear(ctx);
        }
 
        @Override
        public void open(Configuration configuration) throws Exception {
                FunctionUtils.openFunction(this.windowFunction, configuration);
+               ctx = new 
InternalProcessApplyAllWindowContext<>(windowFunction);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 9ea1fdf..1af783a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessWindowFunction<K, W extends 
Window, T, R>
 
        private final ReduceFunction<T> reduceFunction;
        private final ProcessWindowFunction<T, R, K, W> windowFunction;
+       private transient InternalProcessApplyWindowContext<T, R, K, W> ctx;
 
        public ReduceApplyProcessWindowFunction(ReduceFunction<T> 
reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
                this.windowFunction = windowFunction;
@@ -52,17 +53,25 @@ public class ReduceApplyProcessWindowFunction<K, W extends 
Window, T, R>
                                curr = reduceFunction.reduce(curr, val);
                        }
                }
-               windowFunction.process(k, windowFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return context.window();
-                       }
-               }, Collections.singletonList(curr), out);
+
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+               windowFunction.process(k, ctx, Collections.singletonList(curr), 
out);
+       }
+
+       @Override
+       public void clear(final Context context) throws Exception {
+               this.ctx.window = context.window();
+               this.ctx.windowState = context.windowState();
+               this.ctx.globalState = context.globalState();
+               windowFunction.clear(ctx);
        }
 
        @Override
        public void open(Configuration configuration) throws Exception {
                FunctionUtils.openFunction(this.windowFunction, configuration);
+               ctx = new InternalProcessApplyWindowContext<>(windowFunction);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 87c5aca..d58b5cc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -19,6 +19,17 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.UnionIterator;
@@ -38,6 +49,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        private final InternalWindowFunction<Iterable<Type>, Result, Key, 
Window> function;
 
+       private final AccumulatingKeyedTimePanesContext context;
+
        /**
         * IMPORTANT: This value needs to start at one, so it is fresher than 
the value that new entries have (zero) */
        private long evaluationPass = 1L;   
@@ -47,6 +60,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
                this.keySelector = keySelector;
                this.function = function;
+               this.context = new AccumulatingKeyedTimePanesContext();
        }
 
        // 
------------------------------------------------------------------------
@@ -67,13 +81,15 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                        for (KeyMap.Entry<Key, ArrayList<Type>> entry : 
latestPane) {
                                Key key = entry.getKey();
                                operator.setCurrentKey(key);
-                               function.apply(entry.getKey(), window, 
entry.getValue(), out);
+                               context.globalState = 
operator.getKeyedStateStore();
+
+                               function.process(entry.getKey(), window, 
context, entry.getValue(), out);
                        }
                }
                else {
                        // general code path for multi-pane case
                        WindowFunctionTraversal<Key, Type, Result> evaluator = 
new WindowFunctionTraversal<>(
-                                       function, window, out, operator);
+                                       function, window, out, operator, 
context);
                        traverseAllPanes(evaluator, evaluationPass);
                }
                
@@ -95,17 +111,19 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                private final TimeWindow window;
                
                private final AbstractStreamOperator<Result> contextOperator;
-               
+
                private Key currentKey;
                
+               private AccumulatingKeyedTimePanesContext context;
 
                WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, 
Result, Key, Window> function, TimeWindow window,
-                                                               
Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
+                                                               
Collector<Result> out, AbstractStreamOperator<Result> contextOperator, 
AccumulatingKeyedTimePanesContext context) {
                        this.function = function;
                        this.out = out;
                        this.unionIterator = new UnionIterator<>();
                        this.window = window;
                        this.contextOperator = contextOperator;
+                       this.context = context;
                }
 
 
@@ -123,7 +141,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                @Override
                public void keyDone() throws Exception {
                        contextOperator.setCurrentKey(currentKey);
-                       function.apply(currentKey, window, unionIterator, out);
+                       context.globalState = 
contextOperator.getKeyedStateStore();
+                       function.process(currentKey, window, context, 
unionIterator, out);
                }
        }
        
@@ -136,6 +155,52 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
        }
 
+       private static class ThrowingKeyedStateStore implements KeyedStateStore 
{
+               @Override
+               public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
+               }
+
+               @Override
+               public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
+               }
+
+               @Override
+               public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
+               }
+
+               @Override
+               public <T, A> FoldingState<T, A> 
getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
+               }
+
+               @Override
+               public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
+               }
+       }
+
+       private static class AccumulatingKeyedTimePanesContext implements 
InternalWindowFunction.InternalWindowContext {
+               KeyedStateStore globalState;
+               KeyedStateStore throwingStore;
+
+               public AccumulatingKeyedTimePanesContext() {
+                       this.throwingStore = new ThrowingKeyedStateStore();
+               }
+
+               @Override
+               public KeyedStateStore windowState() {
+                       return throwingStore;
+               }
+
+               @Override
+               public KeyedStateStore globalState() {
+                       return globalState;
+               }
+       }
+
        private static final KeyMap.LazyFactory<?> LIST_FACTORY = new 
KeyMap.LazyFactory<ArrayList<?>>() {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 24c8d32..85451a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -134,14 +134,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                                                                
        " window: " + mergeResult);
                                                                }
 
-                                                               context.key = 
key;
-                                                               context.window 
= mergeResult;
+                                                               
triggerContext.key = key;
+                                                               
triggerContext.window = mergeResult;
 
-                                                               
context.onMerge(mergedWindows);
+                                                               
triggerContext.onMerge(mergedWindows);
 
                                                                for (W m : 
mergedWindows) {
-                                                                       
context.window = m;
-                                                                       
context.clear();
+                                                                       
triggerContext.window = m;
+                                                                       
triggerContext.clear();
                                                                        
deleteCleanupTimer(m);
                                                                }
 
@@ -165,12 +165,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                
evictingWindowState.setCurrentNamespace(stateWindow);
                                evictingWindowState.add(element);
 
-                               context.key = key;
-                               context.window = actualWindow;
+                               triggerContext.key = key;
+                               triggerContext.window = actualWindow;
                                evictorContext.key = key;
                                evictorContext.window = actualWindow;
 
-                               TriggerResult triggerResult = 
context.onElement(element);
+                               TriggerResult triggerResult = 
triggerContext.onElement(element);
 
                                if (triggerResult.isFire()) {
                                        Iterable<StreamRecord<IN>> contents = 
evictingWindowState.get();
@@ -201,12 +201,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                evictingWindowState.setCurrentNamespace(window);
                                evictingWindowState.add(element);
 
-                               context.key = key;
-                               context.window = window;
+                               triggerContext.key = key;
+                               triggerContext.window = window;
                                evictorContext.key = key;
                                evictorContext.window = window;
 
-                               TriggerResult triggerResult = 
context.onElement(element);
+                               TriggerResult triggerResult = 
triggerContext.onElement(element);
 
                                if (triggerResult.isFire()) {
                                        Iterable<StreamRecord<IN>> contents = 
evictingWindowState.get();
@@ -236,8 +236,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
        @Override
        public void onEventTime(InternalTimer<K, W> timer) throws Exception {
 
-               context.key = timer.getKey();
-               context.window = timer.getNamespace();
+               triggerContext.key = timer.getKey();
+               triggerContext.window = timer.getNamespace();
                evictorContext.key = timer.getKey();
                evictorContext.window = timer.getNamespace();
 
@@ -245,7 +245,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
-                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       W stateWindow = 
mergingWindows.getStateWindow(triggerContext.window);
                        if (stateWindow == null) {
                                // Timer firing for non-existent window, this 
can only happen if a
                                // trigger did not clean up timers. We have 
already cleared the merging
@@ -255,23 +255,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                
evictingWindowState.setCurrentNamespace(stateWindow);
                        }
                } else {
-                       evictingWindowState.setCurrentNamespace(context.window);
+                       
evictingWindowState.setCurrentNamespace(triggerContext.window);
                }
 
                Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 
                if (contents != null) {
-                       TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
+                       TriggerResult triggerResult = 
triggerContext.onEventTime(timer.getTimestamp());
                        if (triggerResult.isFire()) {
-                               emitWindowContents(context.window, contents, 
evictingWindowState);
+                               emitWindowContents(triggerContext.window, 
contents, evictingWindowState);
                        }
                        if (triggerResult.isPurge()) {
                                evictingWindowState.clear();
                        }
                }
 
-               if (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
-                       clearAllState(context.window, evictingWindowState, 
mergingWindows);
+               if (windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+                       clearAllState(triggerContext.window, 
evictingWindowState, mergingWindows);
                }
 
                if (mergingWindows != null) {
@@ -282,8 +282,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
 
        @Override
        public void onProcessingTime(InternalTimer<K, W> timer) throws 
Exception {
-               context.key = timer.getKey();
-               context.window = timer.getNamespace();
+               triggerContext.key = timer.getKey();
+               triggerContext.window = timer.getNamespace();
                evictorContext.key = timer.getKey();
                evictorContext.window = timer.getNamespace();
 
@@ -291,7 +291,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
-                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       W stateWindow = 
mergingWindows.getStateWindow(triggerContext.window);
                        if (stateWindow == null) {
                                // Timer firing for non-existent window, this 
can only happen if a
                                // trigger did not clean up timers. We have 
already cleared the merging
@@ -301,23 +301,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                
evictingWindowState.setCurrentNamespace(stateWindow);
                        }
                } else {
-                       evictingWindowState.setCurrentNamespace(context.window);
+                       
evictingWindowState.setCurrentNamespace(triggerContext.window);
                }
 
                Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 
                if (contents != null) {
-                       TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
+                       TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
                        if (triggerResult.isFire()) {
-                               emitWindowContents(context.window, contents, 
evictingWindowState);
+                               emitWindowContents(triggerContext.window, 
contents, evictingWindowState);
                        }
                        if (triggerResult.isPurge()) {
                                evictingWindowState.clear();
                        }
                }
 
-               if (!windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
-                       clearAllState(context.window, evictingWindowState, 
mergingWindows);
+               if (!windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+                       clearAllState(triggerContext.window, 
evictingWindowState, mergingWindows);
                }
 
                if (mergingWindows != null) {
@@ -348,7 +348,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                }
                        });
 
-               userFunction.apply(context.key, context.window, 
projectedContents, timestampedCollector);
+               processContext.window = triggerContext.window;
+               userFunction.process(triggerContext.key, triggerContext.window, 
processContext, projectedContents, timestampedCollector);
                evictorContext.evictAfter(recordsWithTimestamp, 
Iterables.size(recordsWithTimestamp));
 
 
@@ -364,9 +365,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                        W window,
                        ListState<StreamRecord<IN>> windowState,
                        MergingWindowSet<W> mergingWindows) throws Exception {
-
                windowState.clear();
-               context.clear();
+               triggerContext.clear();
+               processContext.window = window;
+               processContext.clear();
                if (mergingWindows != null) {
                        mergingWindows.retireWindow(window);
                        mergingWindows.persist();

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3745659..3d40716 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -23,8 +23,16 @@ import org.apache.commons.math3.util.ArithmeticUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -159,7 +167,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        /** This is given to the {@code InternalWindowFunction} for emitting 
elements with a given timestamp. */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
-       protected transient Context context = new Context(null, null);
+       protected transient Context triggerContext = new Context(null, null);
+
+       protected transient WindowContext processContext = new 
WindowContext(null);
 
        protected transient WindowAssigner.WindowAssignerContext 
windowAssignerContext;
 
@@ -264,7 +274,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                internalTimerService =
                                getInternalTimerService("window-timers", 
windowSerializer, this);
 
-               context = new Context(null, null);
+               triggerContext = new Context(null, null);
+               processContext = new WindowContext( null);
 
                windowAssignerContext = new 
WindowAssigner.WindowAssignerContext() {
                        @Override
@@ -317,7 +328,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        public void close() throws Exception {
                super.close();
                timestampedCollector = null;
-               context = null;
+               triggerContext = null;
+               processContext = null;
                windowAssignerContext = null;
        }
 
@@ -325,7 +337,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        public void dispose() throws Exception {
                super.dispose();
                timestampedCollector = null;
-               context = null;
+               triggerContext = null;
+               processContext = null;
                windowAssignerContext = null;
        }
 
@@ -365,14 +378,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                                                        " 
window: " + mergeResult);
                                                }
 
-                                               context.key = key;
-                                               context.window = mergeResult;
+                                               triggerContext.key = key;
+                                               triggerContext.window = 
mergeResult;
 
-                                               context.onMerge(mergedWindows);
+                                               
triggerContext.onMerge(mergedWindows);
 
                                                for (W m: mergedWindows) {
-                                                       context.window = m;
-                                                       context.clear();
+                                                       triggerContext.window = 
m;
+                                                       triggerContext.clear();
                                                        deleteCleanupTimer(m);
                                                }
 
@@ -396,10 +409,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                windowState.setCurrentNamespace(stateWindow);
                                windowState.add(element.getValue());
 
-                               context.key = key;
-                               context.window = actualWindow;
+                               triggerContext.key = key;
+                               triggerContext.window = actualWindow;
 
-                               TriggerResult triggerResult = 
context.onElement(element);
+                               TriggerResult triggerResult = 
triggerContext.onElement(element);
 
                                if (triggerResult.isFire()) {
                                        ACC contents = windowState.get();
@@ -429,10 +442,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                windowState.setCurrentNamespace(window);
                                windowState.add(element.getValue());
 
-                               context.key = key;
-                               context.window = window;
+                               triggerContext.key = key;
+                               triggerContext.window = window;
 
-                               TriggerResult triggerResult = 
context.onElement(element);
+                               TriggerResult triggerResult = 
triggerContext.onElement(element);
 
                                if (triggerResult.isFire()) {
                                        ACC contents = windowState.get();
@@ -460,14 +473,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
        @Override
        public void onEventTime(InternalTimer<K, W> timer) throws Exception {
-               context.key = timer.getKey();
-               context.window = timer.getNamespace();
+               triggerContext.key = timer.getKey();
+               triggerContext.window = timer.getNamespace();
 
                MergingWindowSet<W> mergingWindows;
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
-                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       W stateWindow = 
mergingWindows.getStateWindow(triggerContext.window);
                        if (stateWindow == null) {
                                // Timer firing for non-existent window, this 
can only happen if a
                                // trigger did not clean up timers. We have 
already cleared the merging
@@ -477,7 +490,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                windowState.setCurrentNamespace(stateWindow);
                        }
                } else {
-                       windowState.setCurrentNamespace(context.window);
+                       windowState.setCurrentNamespace(triggerContext.window);
                        mergingWindows = null;
                }
 
@@ -487,17 +500,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                if (contents != null) {
-                       TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
+                       TriggerResult triggerResult = 
triggerContext.onEventTime(timer.getTimestamp());
                        if (triggerResult.isFire()) {
-                               emitWindowContents(context.window, contents);
+                               emitWindowContents(triggerContext.window, 
contents);
                        }
                        if (triggerResult.isPurge()) {
                                windowState.clear();
                        }
                }
 
-               if (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
-                       clearAllState(context.window, windowState, 
mergingWindows);
+               if (windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+                       clearAllState(triggerContext.window, windowState, 
mergingWindows);
                }
 
                if (mergingWindows != null) {
@@ -508,14 +521,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
        @Override
        public void onProcessingTime(InternalTimer<K, W> timer) throws 
Exception {
-               context.key = timer.getKey();
-               context.window = timer.getNamespace();
+               triggerContext.key = timer.getKey();
+               triggerContext.window = timer.getNamespace();
 
                MergingWindowSet<W> mergingWindows;
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
-                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       W stateWindow = 
mergingWindows.getStateWindow(triggerContext.window);
                        if (stateWindow == null) {
                                // Timer firing for non-existent window, this 
can only happen if a
                                // trigger did not clean up timers. We have 
already cleared the merging
@@ -525,7 +538,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                windowState.setCurrentNamespace(stateWindow);
                        }
                } else {
-                       windowState.setCurrentNamespace(context.window);
+                       windowState.setCurrentNamespace(triggerContext.window);
                        mergingWindows = null;
                }
 
@@ -535,17 +548,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                if (contents != null) {
-                       TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
+                       TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
                        if (triggerResult.isFire()) {
-                               emitWindowContents(context.window, contents);
+                               emitWindowContents(triggerContext.window, 
contents);
                        }
                        if (triggerResult.isPurge()) {
                                windowState.clear();
                        }
                }
 
-               if (!windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
-                       clearAllState(context.window, windowState, 
mergingWindows);
+               if (!windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+                       clearAllState(triggerContext.window, windowState, 
mergingWindows);
                }
 
                if (mergingWindows != null) {
@@ -559,14 +572,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
         *
         * <p>The caller must ensure that the
-        * correct key is set in the state backend and the context object.
+        * correct key is set in the state backend and the triggerContext 
object.
         */
        private void clearAllState(
                        W window,
                        AppendingState<IN, ACC> windowState,
                        MergingWindowSet<W> mergingWindows) throws Exception {
                windowState.clear();
-               context.clear();
+               triggerContext.clear();
+               processContext.window = window;
+               processContext.clear();
                if (mergingWindows != null) {
                        mergingWindows.retireWindow(window);
                        mergingWindows.persist();
@@ -579,7 +594,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        @SuppressWarnings("unchecked")
        private void emitWindowContents(W window, ACC contents) throws 
Exception {
                
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-               userFunction.apply(context.key, context.window, contents, 
timestampedCollector);
+               processContext.window = window;
+               userFunction.process(triggerContext.key, window, 
processContext, contents, timestampedCollector);
        }
 
        /**
@@ -636,9 +652,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                if (windowAssigner.isEventTime()) {
-                       context.registerEventTimeTimer(cleanupTime);
+                       triggerContext.registerEventTimeTimer(cleanupTime);
                } else {
-                       context.registerProcessingTimeTimer(cleanupTime);
+                       triggerContext.registerProcessingTimeTimer(cleanupTime);
                }
        }
 
@@ -654,9 +670,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        return;
                }
                if (windowAssigner.isEventTime()) {
-                       context.deleteEventTimeTimer(cleanupTime);
+                       triggerContext.deleteEventTimeTimer(cleanupTime);
                } else {
-                       context.deleteProcessingTimeTimer(cleanupTime);
+                       triggerContext.deleteProcessingTimeTimer(cleanupTime);
                }
        }
 
@@ -686,6 +702,134 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        /**
+        * Base class for per-window {@link KeyedStateStore KeyedStateStores}. 
Used to allow per-window
+        * state access for {@link 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+        */
+       public abstract class AbstractPerWindowStateStore implements 
KeyedStateStore {
+
+               // we have this in the base class even though it's not used in 
MergingKeyStore so that
+               // we can always set it and ignore what actual implementation 
we have
+               protected W window;
+       }
+
+       /**
+        * Special {@link AbstractPerWindowStateStore} that doesn't allow 
access to per-window state.
+        */
+       public class MergingWindowStateStore extends 
AbstractPerWindowStateStore {
+               @Override
+               public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not allowed when using merging windows.");
+               }
+
+               @Override
+               public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not allowed when using merging windows.");
+               }
+
+               @Override
+               public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not allowed when using merging windows.");
+               }
+
+               @Override
+               public <T, A> FoldingState<T, A> 
getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not allowed when using merging windows.");
+               }
+
+               @Override
+               public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+                       throw new UnsupportedOperationException("Per-window 
state is not allowed when using merging windows.");
+               }
+       }
+
+       /**
+        * Regular per-window state store for use with
+        * {@link 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+        */
+       public class PerWindowStateStore extends AbstractPerWindowStateStore {
+               @Override
+               public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
+                       try {
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
state", e);
+                       }
+               }
+
+               @Override
+               public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
+                       try {
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
state", e);
+                       }
+               }
+
+               @Override
+               public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
+                       try {
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
state", e);
+                       }
+               }
+
+               @Override
+               public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+                       try {
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
state", e);
+                       }
+               }
+
+               @Override
+               public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+                       try {
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
state", e);
+                       }
+               }
+       }
+
+       /**
+        * A utility class for handling {@code ProcessWindowFunction} 
invocations. This can be reused
+        * by setting the {@code key} and {@code window} fields. No internal 
state must be kept in the
+        * {@code WindowContext}.
+        */
+       public class WindowContext implements 
InternalWindowFunction.InternalWindowContext {
+               protected W window;
+
+               protected AbstractPerWindowStateStore windowState;
+
+               public WindowContext(W window) {
+                       this.window = window;
+                       this.windowState = windowAssigner instanceof 
MergingWindowAssigner ?  new MergingWindowStateStore() : new 
PerWindowStateStore();
+               }
+
+               @Override
+               public String toString() {
+                       return "WindowContext{Window = " + window.toString() + 
"}";
+               }
+
+               public void clear() throws Exception {
+                       userFunction.clear(window, this);
+               }
+
+               @Override
+               public KeyedStateStore windowState() {
+                       this.windowState.window = this.window;
+                       return this.windowState;
+               }
+
+               @Override
+               public KeyedStateStore globalState() {
+                       return WindowOperator.this.getKeyedStateStore();
+               }
+       }
+
+       /**
         * {@code Context} is a utility for handling {@code Trigger} 
invocations. It can be reused
         * by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
         * the {@code Context}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 9533c95..83e896d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -45,6 +46,8 @@ public final class 
InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
 
        private final AggregateFunction<T, ACC, V> aggFunction;
 
+       private transient InternalProcessAllWindowContext<V, R, W> ctx;
+
        public InternalAggregateProcessAllWindowFunction(
                        AggregateFunction<T, ACC, V> aggFunction,
                        ProcessAllWindowFunction<V, R, W> windowFunction) {
@@ -53,22 +56,31 @@ public final class 
InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
        }
 
        @Override
-       public void apply(Byte key, final W window, Iterable<T> input, 
Collector<R> out) throws Exception {
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
                ProcessAllWindowFunction<V, R, W> wrappedFunction = 
this.wrappedFunction;
-               ProcessAllWindowFunction<V, R, W>.Context context = 
wrappedFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return window;
-                       }
-               };
+               this.ctx = new 
InternalProcessAllWindowContext<>(wrappedFunction);
+       }
 
+       @Override
+       public void process(Byte key, final W window, final 
InternalWindowContext context, Iterable<T> input, Collector<R> out) throws 
Exception {
                final ACC acc = aggFunction.createAccumulator();
 
                for (T val : input) {
                        aggFunction.add(val, acc);
                }
 
-               wrappedFunction.process(context, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+               ProcessAllWindowFunction<V, R, W> wrappedFunction = 
this.wrappedFunction;
+               wrappedFunction.process(ctx, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+       }
+
+       @Override
+       public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+               wrappedFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index 433da9b..e14c9bd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -46,30 +46,36 @@ public final class 
InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
 
        private final AggregateFunction<T, ACC, V> aggFunction;
 
+       private final InternalProcessWindowContext<V, R, K, W> ctx;
+
        public InternalAggregateProcessWindowFunction(
                        AggregateFunction<T, ACC, V> aggFunction,
                        ProcessWindowFunction<V, R, K, W> windowFunction) {
                super(windowFunction);
                this.aggFunction = aggFunction;
+               this.ctx = new InternalProcessWindowContext<>(windowFunction);
        }
-       
-       @Override
-       public void apply(K key, final W window, Iterable<T> input, 
Collector<R> out) throws Exception {
-               ProcessWindowFunction<V, R, K, W> wrappedFunction = 
this.wrappedFunction;
-               ProcessWindowFunction<V, R, K, W>.Context context = 
wrappedFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return window;
-                       }
-               };
 
+       @Override
+       public void process(K key, final W window, final InternalWindowContext 
context, Iterable<T> input, Collector<R> out) throws Exception {
                final ACC acc = aggFunction.createAccumulator();
 
                for (T val : input) {
                        aggFunction.add(val, acc);
                }
 
-               wrappedFunction.process(key, context, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+               ProcessWindowFunction<V, R, K, W> wrappedFunction = 
this.wrappedFunction;
+               wrappedFunction.process(key, ctx, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+       }
+
+       @Override
+       public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+               ProcessWindowFunction<V, R, K, W> wrappedFunction = 
this.wrappedFunction;
+               wrappedFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 672bdb6..f2507ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableAllWindowFunction<IN, 
OUT, W extends Window>
        }
 
        @Override
-       public void apply(Byte key, W window, Iterable<IN> input, 
Collector<OUT> out) throws Exception {
+       public void process(Byte key, W window, InternalWindowContext context, 
Iterable<IN> input, Collector<OUT> out) throws Exception {
                wrappedFunction.apply(window, input, out);
        }
 
        @Override
+       public void clear(W window, InternalWindowContext context) throws 
Exception {
+
+       }
+
+       @Override
        public RuntimeContext getRuntimeContext() {
                throw new RuntimeException("This should never be called.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
index e33cc2a..47b7d55 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing.functions;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -34,21 +35,33 @@ public final class 
InternalIterableProcessAllWindowFunction<IN, OUT, W extends W
 
        private static final long serialVersionUID = 1L;
 
+       private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
        public 
InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> 
wrappedFunction) {
                super(wrappedFunction);
        }
 
        @Override
-       public void apply(Byte key, final W window, Iterable<IN> input, 
Collector<OUT> out) throws Exception {
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
+               this.ctx = new 
InternalProcessAllWindowContext<>(wrappedFunction);
+       }
+
+       @Override
+       public void process(Byte key, final W window, final 
InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws 
Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+               ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
+               wrappedFunction.process(ctx, input, out);
+       }
+
+       @Override
+       public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
                ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
-               ProcessAllWindowFunction<IN, OUT, W>.Context context = 
wrappedFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return window;
-                       }
-               };
-
-               wrappedFunction.process(context, input, out);
+               wrappedFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
index de516a5..7eb015e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -34,21 +34,27 @@ public final class 
InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends
 
        private static final long serialVersionUID = 1L;
 
+       private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
        public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, 
OUT, KEY, W> wrappedFunction) {
                super(wrappedFunction);
+               this.ctx = new InternalProcessWindowContext<>(wrappedFunction);
+       }
+
+       @Override
+       public void process(KEY key, final W window, final 
InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws 
Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+               ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = 
this.wrappedFunction;
+               wrappedFunction.process(key, ctx, input, out);
        }
 
        @Override
-       public void apply(KEY key, final W window, Iterable<IN> input, 
Collector<OUT> out) throws Exception {
+       public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
                ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = 
this.wrappedFunction;
-               ProcessWindowFunction<IN, OUT, KEY, W>.Context context = 
wrappedFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return window;
-                       }
-               };
-               
-               wrappedFunction.process(key, context, input, out);
+               wrappedFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 895b31f..e2f1517 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableWindowFunction<IN, OUT, 
KEY, W extends Window
        }
 
        @Override
-       public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> 
out) throws Exception {
+       public void process(KEY key, W window, InternalWindowContext context, 
Iterable<IN> input, Collector<OUT> out) throws Exception {
                wrappedFunction.apply(key, window, input, out);
        }
 
        @Override
+       public void clear(W window, InternalWindowContext context) throws 
Exception {
+
+       }
+
+       @Override
        public RuntimeContext getRuntimeContext() {
                throw new RuntimeException("This should never be called.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
new file mode 100644
index 0000000..c70e161
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessAllWindowContext<IN, OUT, W extends Window>
+       extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+       W window;
+       InternalWindowFunction.InternalWindowContext internalContext;
+
+       InternalProcessAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> 
function) {
+               function.super();
+       }
+
+       @Override
+       public W window() {
+               return window;
+       }
+
+       @Override
+       public KeyedStateStore windowState() {
+               return internalContext.windowState();
+       }
+
+       @Override
+       public KeyedStateStore globalState() {
+               return internalContext.globalState();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
new file mode 100644
index 0000000..0f1c0ee
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
+       extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+       W window;
+       InternalWindowFunction.InternalWindowContext internalContext;
+
+       InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> 
function) {
+               function.super();
+       }
+
+       @Override
+       public W window() {
+               return window;
+       }
+
+       @Override
+       public KeyedStateStore windowState() {
+               return internalContext.windowState();
+       }
+
+       @Override
+       public KeyedStateStore globalState() {
+               return internalContext.globalState();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index a34d3ec..e90bcf4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueAllWindowFunction<IN, 
OUT, W extends Windo
        }
 
        @Override
-       public void apply(Byte key, W window, IN input, Collector<OUT> out) 
throws Exception {
+       public void process(Byte key, W window, InternalWindowContext context, 
IN input, Collector<OUT> out) throws Exception {
                wrappedFunction.apply(window, Collections.singletonList(input), 
out);
        }
 
        @Override
+       public void clear(W window, InternalWindowContext context) throws 
Exception {
+
+       }
+
+       @Override
        public RuntimeContext getRuntimeContext() {
                throw new RuntimeException("This should never be called.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
index 0284ef7..f7c6a08 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing.functions;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -36,21 +37,33 @@ public final class 
InternalSingleValueProcessAllWindowFunction<IN, OUT, W extend
 
        private static final long serialVersionUID = 1L;
 
+       private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
        public 
InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, 
W> wrappedFunction) {
                super(wrappedFunction);
        }
 
        @Override
-       public void apply(Byte key, final W window, IN input, Collector<OUT> 
out) throws Exception {
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
+               this.ctx = new 
InternalProcessAllWindowContext<>(wrappedFunction);
+       }
+
+       @Override
+       public void process(Byte key, final W window, final 
InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
                ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
-               ProcessAllWindowFunction<IN, OUT, W>.Context context = 
wrappedFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return window;
-                       }
-               };
+               wrappedFunction.process(ctx, Collections.singletonList(input), 
out);
+       }
 
-               wrappedFunction.process(context, 
Collections.singletonList(input), out);
+       @Override
+       public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+               ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
+               wrappedFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index 7a4e8c6..21d1639 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -36,21 +36,29 @@ public final class 
InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W exte
 
        private static final long serialVersionUID = 1L;
 
+       private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
        public 
InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> 
wrappedFunction) {
                super(wrappedFunction);
+               ctx = new InternalProcessWindowContext<>(wrappedFunction);
        }
 
        @Override
-       public void apply(KEY key, final W window, IN input, Collector<OUT> 
out) throws Exception {
+       public void process(KEY key, final W window, final 
InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+
                ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = 
this.wrappedFunction;
-               ProcessWindowFunction<IN, OUT, KEY, W>.Context context = 
wrappedFunction.new Context() {
-                       @Override
-                       public W window() {
-                               return window;
-                       }
-               };
+               wrappedFunction.process(key, ctx, 
Collections.singletonList(input), out);
+       }
 
-               wrappedFunction.process(key, context, 
Collections.singletonList(input), out);
+       @Override
+       public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+               this.ctx.window = window;
+               this.ctx.internalContext = context;
+
+               ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = 
this.wrappedFunction;
+               wrappedFunction.clear(ctx);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 9a0a447..d5cc4a2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueWindowFunction<IN, 
OUT, KEY, W extends Win
        }
 
        @Override
-       public void apply(KEY key, W window, IN input, Collector<OUT> out) 
throws Exception {
+       public void process(KEY key, W window, InternalWindowContext context, 
IN input, Collector<OUT> out) throws Exception {
                wrappedFunction.apply(key, window, 
Collections.singletonList(input), out);
        }
 
        @Override
+       public void clear(W window, InternalWindowContext context) throws 
Exception {
+
+       }
+
+       @Override
        public RuntimeContext getRuntimeContext() {
                throw new RuntimeException("This should never be called.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 2eb4052..9834480 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -29,15 +30,28 @@ import org.apache.flink.util.Collector;
  * @param <KEY> The type of the key.
  */
 public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> 
extends Function {
-
        /**
         * Evaluates the window and outputs none or several elements.
         *
-        * @param key    The key for which this window is evaluated.
-        * @param window The window that is being evaluated.
-        * @param input  The elements in the window being evaluated.
-        * @param out    A collector for emitting elements.
+        * @param context The context in which the window is being evaluated.
+        * @param input The elements in the window being evaluated.
+        * @param out A collector for emitting elements.
+        *
         * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
         */
-       void apply(KEY key, W window, IN input, Collector<OUT> out) throws 
Exception;
+       void process(KEY key, W window, InternalWindowContext context, IN 
input, Collector<OUT> out) throws Exception;
+
+       /**
+        * Deletes any state in the {@code Context} when the Window is purged.
+        *
+        * @param context The context to which the window is being evaluated
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
+        */
+       void clear(W window, InternalWindowContext context) throws Exception;
+
+       interface InternalWindowContext extends java.io.Serializable {
+               KeyedStateStore windowState();
+
+               KeyedStateStore globalState();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 4b479f3..c4bed37 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -21,20 +21,28 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -45,8 +53,8 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.util.Collector;
-import org.junit.Test;
 import org.junit.Assert;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -139,12 +147,26 @@ public class FoldApplyProcessWindowFunctionTest {
 
                expected.add(initValue);
 
-               foldWindowFunction.process(0, foldWindowFunction.new Context() {
+               FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, 
Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
                        @Override
                        public TimeWindow window() {
                                return new TimeWindow(0, 1);
                        }
-               }, input, new ListCollector<>(result));
+
+                       @Override
+                       public KeyedStateStore windowState() {
+                               return new DummyKeyedStateStore();
+                       }
+
+                       @Override
+                       public KeyedStateStore globalState() {
+                               return new DummyKeyedStateStore();
+                       }
+               };
+
+               foldWindowFunction.open(new Configuration());
+
+               foldWindowFunction.process(0, ctx, input, new 
ListCollector<>(result));
 
                Assert.assertEquals(expected, result);
        }
@@ -234,16 +256,58 @@ public class FoldApplyProcessWindowFunctionTest {
 
                expected.add(initValue);
 
-               foldWindowFunction.process(foldWindowFunction.new Context() {
+               FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, 
Integer>.Context ctx = foldWindowFunction.new Context() {
                        @Override
                        public TimeWindow window() {
                                return new TimeWindow(0, 1);
                        }
-               }, input, new ListCollector<>(result));
+
+                       @Override
+                       public KeyedStateStore windowState() {
+                               return new DummyKeyedStateStore();
+                       }
+
+                       @Override
+                       public KeyedStateStore globalState() {
+                               return new DummyKeyedStateStore();
+                       }
+               };
+
+               foldWindowFunction.open(new Configuration());
+
+               foldWindowFunction.process(ctx, input, new 
ListCollector<>(result));
 
                Assert.assertEquals(expected, result);
        }
 
+       public static class DummyKeyedStateStore implements KeyedStateStore {
+
+               @Override
+               public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
+                       return null;
+               }
+
+               @Override
+               public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
+                       return null;
+               }
+
+               @Override
+               public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
+                       return null;
+               }
+
+               @Override
+               public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+                       return null;
+               }
+
+               @Override
+               public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+                       return null;
+               }
+       }
+
        public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
                public DummyStreamExecutionEnvironment() {

Reply via email to