[FLINK-3200] Use Partitioned State in WindowOperator

This changes window operator to use the new partitioned state
abstraction for keeping window contents instead of custom internal
state and the checkpointed interface.

For now, timers are still kept as custom checkpointed state, however.

WindowOperator now expects a StateIdentifier for MergingState, this can
either be for ReducingState or ListState but WindowOperator is agnostic
to the type of State. Also the signature of WindowFunction is changed to
include the type of intermediate input. For example, if a ReducingState
is used the input of the WindowFunction is T (where T is the input
type). If using a ListState the input of the WindowFunction would be of
type Iterable[T].


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67ca4a43
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67ca4a43
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67ca4a43

Branch: refs/heads/master
Commit: 67ca4a436daf8de1a6e0329b4b30342e77d26087
Parents: caf4672
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Jan 25 12:34:05 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 3 20:27:51 2016 +0100

----------------------------------------------------------------------
 .../ml/IncrementalLearningSkeleton.java         |   2 +-
 .../GroupedProcessingTimeWindowExample.java     |   2 +-
 .../examples/windowing/SessionWindowing.java    |  11 +-
 .../examples/windowing/TopSpeedWindowing.java   |   2 +-
 .../examples/windowing/TopSpeedWindowing.scala  |   2 +-
 .../api/datastream/AllWindowedStream.java       |  37 +-
 .../api/datastream/CoGroupedStreams.java        |   2 +-
 .../api/datastream/WindowedStream.java          | 118 ++--
 .../aggregation/AggregationFunction.java        |   4 +-
 .../functions/windowing/AllWindowFunction.java  |   4 +-
 .../windowing/FoldAllWindowFunction.java        |   2 +-
 .../functions/windowing/FoldWindowFunction.java |   2 +-
 .../windowing/ReduceAllWindowFunction.java      |  44 +-
 .../windowing/ReduceApplyAllWindowFunction.java |  54 ++
 .../windowing/ReduceApplyWindowFunction.java    |  54 ++
 .../ReduceIterableAllWindowFunction.java        |  46 ++
 .../windowing/ReduceIterableWindowFunction.java |  46 ++
 .../windowing/ReduceWindowFunction.java         |  26 +-
 .../ReduceWindowFunctionWithWindow.java         |  44 +-
 .../api/functions/windowing/WindowFunction.java |   4 +-
 .../triggers/ContinuousEventTimeTrigger.java    |   8 +-
 .../ContinuousProcessingTimeTrigger.java        |  11 +-
 .../api/windowing/triggers/CountTrigger.java    |   9 +-
 .../api/windowing/triggers/DeltaTrigger.java    |  18 +-
 .../api/windowing/triggers/Trigger.java         |  57 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |   8 +-
 ...ccumulatingProcessingTimeWindowOperator.java |   6 +-
 .../EvictingNonKeyedWindowOperator.java         |   2 +-
 .../windowing/EvictingWindowOperator.java       | 119 +++-
 .../windowing/NonKeyedWindowOperator.java       |  73 ++-
 .../operators/windowing/WindowOperator.java     | 570 ++++++++-----------
 .../flink/streaming/api/DataStreamTest.java     |   2 +-
 .../api/complex/ComplexIntegrationTest.java     |  59 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  10 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  22 +-
 .../windowing/AllWindowTranslationTest.java     |  10 +-
 .../EvictingNonKeyedWindowOperatorTest.java     |  32 +-
 .../windowing/EvictingWindowOperatorTest.java   | 131 ++++-
 .../windowing/NonKeyedWindowOperatorTest.java   |  46 +-
 .../windowing/TimeWindowTranslationTest.java    |   8 +-
 .../operators/windowing/WindowOperatorTest.java | 275 ++++++---
 .../windowing/WindowTranslationTest.java        |  50 +-
 .../streaming/api/scala/AllWindowedStream.scala |  19 +-
 .../streaming/api/scala/WindowedStream.scala    |  19 +-
 .../api/scala/AllWindowTranslationTest.scala    |  41 +-
 .../api/scala/WindowTranslationTest.scala       |  46 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  54 +-
 .../EventTimeWindowCheckpointingITCase.java     | 116 ++--
 .../WindowCheckpointingITCase.java              |  35 +-
 49 files changed, 1399 insertions(+), 963 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 32cf430..8f502dd 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -172,7 +172,7 @@ public class IncrementalLearningSkeleton {
        /**
         * Builds up-to-date partial models on new training data.
         */
-       public static class PartialModelBuilder implements 
AllWindowFunction<Integer, Double[], TimeWindow> {
+       public static class PartialModelBuilder implements 
AllWindowFunction<Iterable<Integer>, Double[], TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                protected Double[] buildPartialModel(Iterable<Integer> values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index f08069b..196b73e 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -104,7 +104,7 @@ public class GroupedProcessingTimeWindowExample {
                }
        }
 
-       public static class SummingWindowFunction implements 
WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
+       public static class SummingWindowFunction implements 
WindowFunction<Iterable<Tuple2<Long, Long>>, Tuple2<Long, Long>, Long, Window> {
 
                @Override
                public void apply(Long key, Window window, 
Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index b86830d..69f61bc 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,7 +17,10 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -100,6 +103,10 @@ public class SessionWindowing {
 
                private final Long sessionTimeout;
 
+               private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("last-seen", 1L,
+                       BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+
+
                public SessionTrigger(Long sessionTimeout) {
                        this.sessionTimeout = sessionTimeout;
 
@@ -108,7 +115,7 @@ public class SessionWindowing {
                @Override
                public TriggerResult onElement(Tuple3<String, Long, Integer> 
element, long timestamp, GlobalWindow window, TriggerContext ctx) throws 
Exception {
 
-                       ValueState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       ValueState<Long> lastSeenState = 
ctx.getPartitionedState(stateDesc);
                        Long lastSeen = lastSeenState.value();
 
                        Long timeSinceLastEvent = timestamp - lastSeen;
@@ -127,7 +134,7 @@ public class SessionWindowing {
 
                @Override
                public TriggerResult onEventTime(long time, GlobalWindow 
window, TriggerContext ctx) throws Exception {
-                       ValueState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       ValueState<Long> lastSeenState = 
ctx.getPartitionedState(stateDesc);
                        Long lastSeen = lastSeenState.value();
 
                        if (time - lastSeen >= sessionTimeout) {

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 30eda67..5a56a40 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -85,7 +85,7 @@ public class TopSpeedWindowing {
                                                                        
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
                                                                return 
newDataPoint.f2 - oldDataPoint.f2;
                                                        }
-                                               }))
+                                               }, 
carData.getType().createSerializer(env.getConfig())))
                                .maxBy(1);
 
                if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index f26f32c..c30e654 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -72,7 +72,7 @@ object TopSpeedWindowing {
       .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, 
TimeUnit.MILLISECONDS)))
       .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
         def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = 
newSp.distance - oldSp.distance
-      }))
+      }, cars.getType().createSerializer(env.getConfig)))
 //      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
 //      .every(Delta.of[CarEvent](triggerMeters,
 //          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 989e762..8cef5ea 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -21,8 +21,10 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,8 +32,9 @@ import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import 
org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
@@ -126,6 +129,11 @@ public class AllWindowedStream<T, W extends Window> {
         * @return The data stream that is the result of applying the reduce 
function to the window. 
         */
        public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> 
function) {
+               if (function instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction. " +
+                               "Please use apply(ReduceFunction, 
WindowFunction) instead.");
+               }
+
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
 
@@ -147,7 +155,7 @@ public class AllWindowedStream<T, W extends Window> {
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        new HeapWindowBuffer.Factory<T>(),
-                                       new ReduceAllWindowFunction<W, 
T>(function),
+                                       new ReduceIterableAllWindowFunction<W, 
T>(function),
                                        trigger,
                                        
evictor).enableSetProcessingTime(setProcessingTime);
 
@@ -155,7 +163,7 @@ public class AllWindowedStream<T, W extends Window> {
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        new 
PreAggregatingHeapWindowBuffer.Factory<>(function),
-                                       new ReduceAllWindowFunction<W, 
T>(function),
+                                       new ReduceIterableAllWindowFunction<W, 
T>(function),
                                        
trigger).enableSetProcessingTime(setProcessingTime);
                }
 
@@ -205,10 +213,11 @@ public class AllWindowedStream<T, W extends Window> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, 
R, W> function) {
-               TypeInformation<T> inType = input.getType();
+       public <R> SingleOutputStreamOperator<R, ?> 
apply(AllWindowFunction<Iterable<T>, R, W> function) {
+               @SuppressWarnings("unchecked, rawtypes")
+               TypeInformation<Iterable<T>> iterTypeInfo = new 
GenericTypeInfo<>((Class) Iterable.class);
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, AllWindowFunction.class, true, true, 
inType, null, false);
+                               function, AllWindowFunction.class, true, true, 
iterTypeInfo, null, false);
 
                return apply(function, resultType);
        }
@@ -224,7 +233,7 @@ public class AllWindowedStream<T, W extends Window> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, 
R, W> function, TypeInformation<R> resultType) {
+       public <R> SingleOutputStreamOperator<R, ?> 
apply(AllWindowFunction<Iterable<T>, R, W> function, TypeInformation<R> 
resultType) {
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
 
@@ -297,6 +306,10 @@ public class AllWindowedStream<T, W extends Window> {
         * @return The data stream that is the result of applying the window 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> 
preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> 
resultType) {
+               if (preAggregator instanceof RichFunction) {
+                       throw new UnsupportedOperationException("Pre-aggregator 
of apply can not be a RichFunction.");
+               }
+
                //clean the closures
                function = input.getExecutionEnvironment().clean(function);
                preAggregator = 
input.getExecutionEnvironment().clean(preAggregator);
@@ -314,16 +327,16 @@ public class AllWindowedStream<T, W extends Window> {
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        new HeapWindowBuffer.Factory<T>(),
-                                       function,
+                                       new 
ReduceApplyAllWindowFunction<>(preAggregator, function),
                                        trigger,
                                        
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
-                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       new 
PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
-                                       function,
-                                       
trigger).enableSetProcessingTime(setProcessingTime);
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               new 
PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+                               new 
ReduceApplyAllWindowFunction<>(preAggregator, function),
+                               
trigger).enableSetProcessingTime(setProcessingTime);
                }
 
                return input.transform(opName, resultType, 
operator).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index d1da783..3903015 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -545,7 +545,7 @@ public class CoGroupedStreams<T1, T2> {
 
        private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends 
Window>
                        extends WrappingFunction<CoGroupFunction<T1, T2, T>>
-                       implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, 
W> {
+                       implements WindowFunction<Iterable<TaggedUnion<T1, 
T2>>, T, KEY, W> {
                
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 9dbee30..d64248f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -21,9 +21,13 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,8 +35,10 @@ import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
@@ -46,8 +52,8 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces
 import 
org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 /**
  * A {@code WindowedStream} represents a data stream where elements are 
grouped by
@@ -136,7 +142,13 @@ public class WindowedStream<T, K, W extends Window> {
         * @param function The reduce function.
         * @return The data stream that is the result of applying the reduce 
function to the window. 
         */
+       @SuppressWarnings("unchecked")
        public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> 
function) {
+               if (function instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction. " +
+                               "Please use apply(ReduceFunction, 
WindowFunction) instead.");
+               }
+
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
 
@@ -156,23 +168,30 @@ public class WindowedStream<T, K, W extends Window> {
                boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
                if (evictor != null) {
+                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
                        operator = new EvictingWindowOperator<>(windowAssigner,
-                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       keySel,
-                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
-                                       new ReduceWindowFunction<K, W, 
T>(function),
-                                       trigger,
-                                       
evictor).enableSetProcessingTime(setProcessingTime);
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               keySel,
+                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                               stateDesc,
+                               new ReduceIterableWindowFunction<K, W, 
T>(function),
+                               trigger,
+                               
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
+                       ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
+                               function,
+                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
                        operator = new WindowOperator<>(windowAssigner,
-                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       keySel,
-                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       new 
PreAggregatingHeapWindowBuffer.Factory<>(function),
-                                       new ReduceWindowFunction<K, W, 
T>(function),
-                                       
trigger).enableSetProcessingTime(setProcessingTime);
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               keySel,
+                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                               stateDesc,
+                               new ReduceWindowFunction<K, W, T>(),
+                               
trigger).enableSetProcessingTime(setProcessingTime);
                }
 
                return input.transform(opName, input.getType(), operator);
@@ -222,10 +241,11 @@ public class WindowedStream<T, K, W extends Window> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, 
K, W> function) {
-               TypeInformation<T> inType = input.getType();
+       public <R> SingleOutputStreamOperator<R, ?> 
apply(WindowFunction<Iterable<T>, R, K, W> function) {
+               @SuppressWarnings("unchecked, rawtypes")
+               TypeInformation<Iterable<T>> iterTypeInfo = new 
GenericTypeInfo<>((Class) Iterable.class);
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, WindowFunction.class, true, true, 
inType, null, false);
+                               function, WindowFunction.class, true, true, 
iterTypeInfo, null, false);
 
                return apply(function, resultType);
        }
@@ -243,7 +263,8 @@ public class WindowedStream<T, K, W extends Window> {
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, 
K, W> function, TypeInformation<R> resultType) {
+       public <R> SingleOutputStreamOperator<R, ?> 
apply(WindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> 
resultType) {
+
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
 
@@ -259,26 +280,33 @@ public class WindowedStream<T, K, W extends Window> {
                String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
                KeySelector<T, K> keySel = input.getKeySelector();
 
-               WindowOperator<K, T, R, W> operator;
+               WindowOperator<K, T, Iterable<T>, R, W> operator;
 
                boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
+
                if (evictor != null) {
+                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
                        operator = new EvictingWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
+                                       stateDesc,
                                        function,
                                        trigger,
                                        
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
+                       ListStateDescriptor<T> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
                        operator = new WindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
+                                       stateDesc,
                                        function,
                                        
trigger).enableSetProcessingTime(setProcessingTime);
                }
@@ -294,17 +322,17 @@ public class WindowedStream<T, K, W extends Window> {
         * <p>
         * Arriving data is pre-aggregated using the given pre-aggregation 
reducer.
         *
-        * @param preAggregator The reduce function that is used for 
pre-aggregation
+        * @param reduceFunction The reduce function that is used for 
pre-aggregation
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
 
-       public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> 
preAggregator, WindowFunction<T, R, K, W> function) {
+       public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function) {
                TypeInformation<T> inType = input.getType();
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
                                function, WindowFunction.class, true, true, 
inType, null, false);
 
-               return apply(preAggregator, function, resultType);
+               return apply(reduceFunction, function, resultType);
        }
 
        /**
@@ -315,15 +343,19 @@ public class WindowedStream<T, K, W extends Window> {
         * <p>
         * Arriving data is pre-aggregated using the given pre-aggregation 
reducer.
         *
-        * @param preAggregator The reduce function that is used for 
pre-aggregation
+        * @param reduceFunction The reduce function that is used for 
pre-aggregation
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> 
preAggregator, WindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
+       public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
+               if (reduceFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("Pre-aggregator 
of apply can not be a RichFunction.");
+               }
+
                //clean the closures
                function = input.getExecutionEnvironment().clean(function);
-               preAggregator = 
input.getExecutionEnvironment().clean(preAggregator);
+               reduceFunction = 
input.getExecutionEnvironment().clean(reduceFunction);
 
                String callLocation = Utils.getCallLocationName();
                String udfName = "WindowApply at " + callLocation;
@@ -336,21 +368,29 @@ public class WindowedStream<T, K, W extends Window> {
                boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
                if (evictor != null) {
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
                        operator = new EvictingWindowOperator<>(windowAssigner,
-                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       keySel,
-                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
-                                       function,
-                                       trigger,
-                                       
evictor).enableSetProcessingTime(setProcessingTime);
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               keySel,
+                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                               stateDesc,
+                               new ReduceApplyWindowFunction<>(reduceFunction, 
function),
+                               trigger,
+                               
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
+                       ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
+                               reduceFunction,
+                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
                        operator = new WindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       new 
PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+                                       stateDesc,
                                        function,
                                        
trigger).enableSetProcessingTime(setProcessingTime);
                }
@@ -587,7 +627,7 @@ public class WindowedStream<T, K, W extends Window> {
                        }
                        else if (function instanceof WindowFunction) {
                                @SuppressWarnings("unchecked")
-                               WindowFunction<T, R, K, TimeWindow> wf = 
(WindowFunction<T, R, K, TimeWindow>) function;
+                               WindowFunction<Iterable<T>, R, K, TimeWindow> 
wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
 
                                OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
                                                wf, input.getKeySelector(),
@@ -619,7 +659,7 @@ public class WindowedStream<T, K, W extends Window> {
                        }
                        else if (function instanceof WindowFunction) {
                                @SuppressWarnings("unchecked")
-                               WindowFunction<T, R, K, TimeWindow> wf = 
(WindowFunction<T, R, K, TimeWindow>) function;
+                               WindowFunction<Iterable<T>, R, K, TimeWindow> 
wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
 
                                OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
                                                wf, input.getKeySelector(),

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
index ed39103..fe711a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.api.functions.aggregation;
 
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 
-public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
+public abstract class AggregationFunction<T> implements ReduceFunction<T> {
        private static final long serialVersionUID = 1L;
 
        public enum AggregationType {

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
index 1d54436..b66bac6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
@@ -30,7 +30,7 @@ import java.io.Serializable;
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
  */
-public interface AllWindowFunction<IN, OUT,  W extends Window> extends 
Function, Serializable {
+public interface AllWindowFunction<IN, OUT, W extends Window> extends 
Function, Serializable {
 
        /**
         * Evaluates the window and outputs none or several elements.
@@ -41,5 +41,5 @@ public interface AllWindowFunction<IN, OUT,  W extends 
Window> extends Function,
         *
         * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
         */
-       void apply(W window, Iterable<IN> values, Collector<OUT> out) throws 
Exception;
+       void apply(W window, IN values, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
index af32f9b..46f9b3c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
@@ -36,7 +36,7 @@ import java.io.IOException;
 
 public class FoldAllWindowFunction<W extends Window, T, R>
                extends WrappingFunction<FoldFunction<T, R>>
-               implements AllWindowFunction<T, R, W>, 
OutputTypeConfigurable<R> {
+               implements AllWindowFunction<Iterable<T>, R, W>, 
OutputTypeConfigurable<R> {
        private static final long serialVersionUID = 1L;
 
        private byte[] serializedInitialValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
index b1eb3cd..db6d1bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
@@ -36,7 +36,7 @@ import java.io.ObjectInputStream;
 
 public class FoldWindowFunction<K, W extends Window, T, R>
                extends WrappingFunction<FoldFunction<T, R>>
-               implements WindowFunction<T, R, K, W>, 
OutputTypeConfigurable<R> {
+               implements WindowFunction<Iterable<T>, R, K, W>, 
OutputTypeConfigurable<R> {
        private static final long serialVersionUID = 1L;
 
        private byte[] serializedInitialValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
index 24855a5..76b095b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
@@ -17,54 +17,14 @@
  */
 package org.apache.flink.streaming.api.functions.windowing;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 public class ReduceAllWindowFunction<W extends Window, T> extends 
RichAllWindowFunction<T, T, W> {
        private static final long serialVersionUID = 1L;
 
-       private final ReduceFunction<T> reduceFunction;
-
-       public ReduceAllWindowFunction(ReduceFunction<T> reduceFunction) {
-               this.reduceFunction = reduceFunction;
-       }
-
-       @Override
-       public void setRuntimeContext(RuntimeContext ctx) {
-               super.setRuntimeContext(ctx);
-               FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               FunctionUtils.openFunction(reduceFunction, parameters);
-       }
-
        @Override
-       public void close() throws Exception {
-               super.close();
-               FunctionUtils.closeFunction(reduceFunction);
-       }
-
-       @Override
-       public void apply(W window, Iterable<T> values, Collector<T> out) 
throws Exception {
-               T result = null;
-
-               for (T v: values) {
-                       if (result == null) {
-                               result = v;
-                       } else {
-                               result = reduceFunction.reduce(result, v);
-                       }
-               }
-
-               if (result != null) {
-                       out.collect(result);
-               }
+       public void apply(W window, T input, Collector<T> out) throws Exception 
{
+               out.collect(input);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
new file mode 100644
index 0000000..f9fb771
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceApplyAllWindowFunction<W extends Window, T, R>
+       extends WrappingFunction<AllWindowFunction<T, R, W>>
+       implements AllWindowFunction<Iterable<T>, R, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+       private final AllWindowFunction<T, R, W> windowFunction;
+
+       public ReduceApplyAllWindowFunction(ReduceFunction<T> reduceFunction,
+               AllWindowFunction<T, R, W> windowFunction) {
+               super(windowFunction);
+               this.reduceFunction = reduceFunction;
+               this.windowFunction = windowFunction;
+       }
+
+       @Override
+       public void apply(W window, Iterable<T> input, Collector<R> out) throws 
Exception {
+
+               T curr = null;
+               for (T val: input) {
+                       if (curr == null) {
+                               curr = val;
+                       } else {
+                               curr = reduceFunction.reduce(curr, val);
+                       }
+               }
+               windowFunction.apply(window, curr, out);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
new file mode 100644
index 0000000..bf52e9b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceApplyWindowFunction<K, W extends Window, T, R>
+       extends WrappingFunction<WindowFunction<T, R, K, W>>
+       implements WindowFunction<Iterable<T>, R, K, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+       private final WindowFunction<T, R, K, W> windowFunction;
+
+       public ReduceApplyWindowFunction(ReduceFunction<T> reduceFunction,
+               WindowFunction<T, R, K, W> windowFunction) {
+               super(windowFunction);
+               this.reduceFunction = reduceFunction;
+               this.windowFunction = windowFunction;
+       }
+
+       @Override
+       public void apply(K k, W window, Iterable<T> input, Collector<R> out) 
throws Exception {
+
+               T curr = null;
+               for (T val: input) {
+                       if (curr == null) {
+                               curr = val;
+                       } else {
+                               curr = reduceFunction.reduce(curr, val);
+                       }
+               }
+               windowFunction.apply(k, window, curr, out);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
new file mode 100644
index 0000000..2283fe7
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceIterableAllWindowFunction<W extends Window, T> implements 
AllWindowFunction<Iterable<T>, T, W> {
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+
+       public ReduceIterableAllWindowFunction(ReduceFunction<T> 
reduceFunction) {
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public void apply(W window, Iterable<T> input, Collector<T> out) throws 
Exception {
+
+               T curr = null;
+               for (T val: input) {
+                       if (curr == null) {
+                               curr = val;
+                       } else {
+                               curr = reduceFunction.reduce(curr, val);
+                       }
+               }
+               out.collect(curr);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
new file mode 100644
index 0000000..063cee4
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceIterableWindowFunction<K, W extends Window, T> implements 
WindowFunction<Iterable<T>, T, K, W> {
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+
+       public ReduceIterableWindowFunction(ReduceFunction<T> reduceFunction) {
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public void apply(K k, W window, Iterable<T> input, Collector<T> out) 
throws Exception {
+
+               T curr = null;
+               for (T val: input) {
+                       if (curr == null) {
+                               curr = val;
+                       } else {
+                               curr = reduceFunction.reduce(curr, val);
+                       }
+               }
+               out.collect(curr);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
index edd8a34..8be4553b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -17,34 +17,14 @@
  */
 package org.apache.flink.streaming.api.functions.windowing;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
-public class ReduceWindowFunction<K, W extends Window, T>
-               extends WrappingFunction<ReduceFunction<T>>
-               implements WindowFunction<T, T, K, W> {
+public class ReduceWindowFunction<K, W extends Window, T> implements 
WindowFunction<T, T, K, W> {
        private static final long serialVersionUID = 1L;
 
-       public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
-               super(reduceFunction);
-       }
-
        @Override
-       public void apply(K k, W window, Iterable<T> values, Collector<T> out) 
throws Exception {
-               T result = null;
-
-               for (T v: values) {
-                       if (result == null) {
-                               result = v;
-                       } else {
-                               result = wrappedFunction.reduce(result, v);
-                       }
-               }
-
-               if (result != null) {
-                       out.collect(result);
-               }
+       public void apply(K k, W window, T input, Collector<T> out) throws 
Exception {
+               out.collect(input);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
index 6a472b1..fe42cd3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
@@ -17,55 +17,15 @@
  */
 package org.apache.flink.streaming.api.functions.windowing;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends 
RichWindowFunction<T, Tuple2<W, T>, K, W> {
        private static final long serialVersionUID = 1L;
 
-       private final ReduceFunction<T> reduceFunction;
-
-       public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) 
{
-               this.reduceFunction = reduceFunction;
-       }
-
-       @Override
-       public void setRuntimeContext(RuntimeContext ctx) {
-               super.setRuntimeContext(ctx);
-               FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               FunctionUtils.openFunction(reduceFunction, parameters);
-       }
-
        @Override
-       public void close() throws Exception {
-               super.close();
-               FunctionUtils.closeFunction(reduceFunction);
-       }
-
-       @Override
-       public void apply(K k, W window, Iterable<T> values, 
Collector<Tuple2<W, T>> out) throws Exception {
-               T result = null;
-
-               for (T v: values) {
-                       if (result == null) {
-                               result = v;
-                       } else {
-                               result = reduceFunction.reduce(result, v);
-                       }
-               }
-
-               if (result != null) {
-                       out.collect(Tuple2.of(window, result));
-               }
+       public void apply(K k, W window, T input, Collector<Tuple2<W, T>> out) 
throws Exception {
+               out.collect(Tuple2.of(window, input));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
index eda12c0..204d6a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
@@ -38,10 +38,10 @@ public interface WindowFunction<IN, OUT, KEY, W extends 
Window> extends Function
         *
         * @param key The key for which this window is evaluated.
         * @param window The window that is being evaluated.
-        * @param values The elements in the window being evaluated.
+        * @param input The elements in the window being evaluated.
         * @param out A collector for emitting elements.
         * 
         * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery. 
         */
-       void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) 
throws Exception;
+       void apply(KEY key, W window, IN input, Collector<OUT> out) throws 
Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index b653be3..17818af 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -18,7 +18,10 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -35,6 +38,9 @@ public class ContinuousEventTimeTrigger<W extends Window> 
implements Trigger<Obj
 
        private final long interval;
 
+       private final ValueStateDescriptor<Boolean> stateDesc = new 
ValueStateDescriptor<>("first", true,
+               BasicTypeInfo.BOOLEAN_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+
        private ContinuousEventTimeTrigger(long interval) {
                this.interval = interval;
        }
@@ -42,7 +48,7 @@ public class ContinuousEventTimeTrigger<W extends Window> 
implements Trigger<Obj
        @Override
        public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) throws Exception {
 
-               ValueState<Boolean> first = ctx.getKeyValueState("first", true);
+               ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);
 
                if (first.value()) {
                        long start = timestamp - (timestamp % interval);

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 7f3e7ec..20a2274 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -18,7 +18,10 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -33,6 +36,10 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
 
        private final long interval;
 
+       private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("fire-timestamp", 0L,
+               BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+
+
        private ContinuousProcessingTimeTrigger(long interval) {
                this.interval = interval;
        }
@@ -41,7 +48,7 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
        public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) throws Exception {
                long currentTime = System.currentTimeMillis();
 
-               ValueState<Long> fireState = 
ctx.getKeyValueState("fire-timestamp", 0L);
+               ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
                long nextFireTimestamp = fireState.value();
 
                if (nextFireTimestamp == 0) {
@@ -70,7 +77,7 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
        @Override
        public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception {
 
-               ValueState<Long> fireState = 
ctx.getKeyValueState("fire-timestamp", 0L);
+               ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
                long nextFireTimestamp = fireState.value();
 
                // only fire if an element didn't already fire

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index d101fe1..e8742d5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 import java.io.IOException;
@@ -32,13 +35,17 @@ public class CountTrigger<W extends Window> implements 
Trigger<Object, W> {
 
        private final long maxCount;
 
+       private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("count", 0L,
+               BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+
+
        private CountTrigger(long maxCount) {
                this.maxCount = maxCount;
        }
 
        @Override
        public TriggerResult onElement(Object element, long timestamp, W 
window, TriggerContext ctx) throws IOException {
-               ValueState<Long> count = ctx.getKeyValueState("count", 0L);
+               ValueState<Long> count = ctx.getPartitionedState(stateDesc);
                long currentCount = count.value() + 1;
                count.update(currentCount);
                if (currentCount >= maxCount) {

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 37c8a45..60ada88 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -18,11 +18,11 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
-import java.io.Serializable;
-
 /**
  * A {@link Trigger} that fires based on a {@link DeltaFunction} and a 
threshold.
  *
@@ -33,20 +33,23 @@ import java.io.Serializable;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
  */
-public class DeltaTrigger<T extends Serializable, W extends Window> implements 
Trigger<T, W> {
+public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
        private static final long serialVersionUID = 1L;
 
        private final DeltaFunction<T> deltaFunction;
        private final double threshold;
+       private final ValueStateDescriptor<T> stateDesc;
 
-       private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
+       private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, 
TypeSerializer<T> stateSerializer) {
                this.deltaFunction = deltaFunction;
                this.threshold = threshold;
+               stateDesc = new ValueStateDescriptor<>("last-element", null, 
stateSerializer);
+
        }
 
        @Override
        public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
-               ValueState<T> lastElementState = 
ctx.getKeyValueState("last-element", null);
+               ValueState<T> lastElementState = 
ctx.getPartitionedState(stateDesc);
                if (lastElementState.value() == null) {
                        lastElementState.update(element);
                        return TriggerResult.CONTINUE;
@@ -78,11 +81,12 @@ public class DeltaTrigger<T extends Serializable, W extends 
Window> implements T
         *
         * @param threshold The threshold at which to trigger.
         * @param deltaFunction The delta function to use
+        * @param stateSerializer TypeSerializer for the data elements.
         *
         * @param <T> The type of elements on which this trigger can operate.
         * @param <W> The type of {@link Window Windows} on which this trigger 
can operate.
         */
-       public static <T extends Serializable, W extends Window> 
DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
-               return new DeltaTrigger<>(threshold, deltaFunction);
+       public static <T, W extends Window> DeltaTrigger<T, W> of(double 
threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
+               return new DeltaTrigger<>(threshold, deltaFunction, 
stateSerializer);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index aed393b..8ea50b3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 import java.io.Serializable;
@@ -142,20 +145,62 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
                 * Register an event-time callback. When the current watermark 
passes the specified
                 * time {@link #onEventTime(long, Window, TriggerContext)} is 
called with the time specified here.
                 *
-                * @see org.apache.flink.streaming.api.watermark.Watermark
-                *
                 * @param time The watermark at which to invoke {@link 
#onEventTime(long, Window, TriggerContext)}
+                * @see org.apache.flink.streaming.api.watermark.Watermark
                 */
                void registerEventTimeTimer(long time);
 
                /**
-                * Retrieves an {@link ValueState} object that can be used to 
interact with
+                * Retrieves an {@link State} object that can be used to 
interact with
+                * fault-tolerant state that is scoped to the window and key of 
the current
+                * trigger invocation.
+                *
+                * @param stateDescriptor The StateDescriptor that contains the 
name and type of the
+                *                        state that is being accessed.
+                * @param <S>             The type of the state.
+                * @return The partitioned state object.
+                * @throws UnsupportedOperationException Thrown, if no 
partitioned state is available for the
+                *                                       function (function is 
not part os a KeyedStream).
+                */
+               <S extends State> S getPartitionedState(StateDescriptor<S> 
stateDescriptor);
+
+               /**
+                * Retrieves a {@link ValueState} object that can be used to 
interact with
                 * fault-tolerant state that is scoped to the window and key of 
the current
                 * trigger invocation.
                 *
-                * @param name A unique key for the state.
-                * @param defaultState The default value of the state.
+                * @param name The name of the key/value state.
+                * @param stateType The class of the type that is stored in the 
state. Used to generate
+                *                  serializers for managed memory and 
checkpointing.
+                * @param defaultState The default state value, returned when 
the state is accessed and
+                *                     no value has yet been set for the key. 
May be null.
+                *
+                * @param <S>          The type of the state.
+                * @return The partitioned state object.
+                * @throws UnsupportedOperationException Thrown, if no 
partitioned state is available for the
+                *                                       function (function is 
not part os a KeyedStream).
+                */
+               @Deprecated
+               <S extends Serializable> ValueState<S> getKeyValueState(String 
name, Class<S> stateType, S defaultState);
+
+
+               /**
+                * Retrieves a {@link ValueState} object that can be used to 
interact with
+                * fault-tolerant state that is scoped to the window and key of 
the current
+                * trigger invocation.
+                *
+                * @param name The name of the key/value state.
+                * @param stateType The type information for the type that is 
stored in the state.
+                *                  Used to create serializers for managed 
memory and checkpoints.
+                * @param defaultState The default state value, returned when 
the state is accessed and
+                *                     no value has yet been set for the key. 
May be null.
+                *
+                * @param <S>          The type of the state.
+                * @return The partitioned state object.
+                * @throws UnsupportedOperationException Thrown, if no 
partitioned state is available for the
+                *                                       function (function is 
not part os a KeyedStream).
                 */
-               <S extends Serializable> ValueState<S> getKeyValueState(final 
String name, final S defaultState);
+               @Deprecated
+               <S extends Serializable> ValueState<S> getKeyValueState(String 
name, TypeInformation<S> stateType, S defaultState);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index e15de8e..30c40bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -35,7 +35,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
 
-       private final WindowFunction<Type, Result, Key, Window> function;
+       private final WindowFunction<Iterable<Type>, Result, Key, Window> 
function;
 
        /**
         * IMPORTANT: This value needs to start at one, so it is fresher than 
the value that new entries have (zero) */
@@ -43,7 +43,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        // 
------------------------------------------------------------------------
        
-       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
WindowFunction<Type, Result, Key, Window> function) {
+       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
WindowFunction<Iterable<Type>, Result, Key, Window> function) {
                this.keySelector = keySelector;
                this.function = function;
        }
@@ -85,7 +85,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        
        static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-               private final WindowFunction<Type, Result, Key, Window> 
function;
+               private final WindowFunction<Iterable<Type>, Result, Key, 
Window> function;
                
                private final UnionIterator<Type> unionIterator;
                
@@ -98,7 +98,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                private Key currentKey;
                
 
-               WindowFunctionTraversal(WindowFunction<Type, Result, Key, 
Window> function, TimeWindow window, 
+               WindowFunctionTraversal(WindowFunction<Iterable<Type>, Result, 
Key, Window> function, TimeWindow window,
                                                                
Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
                        this.function = function;
                        this.out = out;

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 7a7d04c..da64df8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -32,13 +32,13 @@ import java.util.ArrayList;
 
 
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
+               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> {
 
        private static final long serialVersionUID = 7305948082830843475L;
 
        
        public AccumulatingProcessingTimeWindowOperator(
-                       WindowFunction<IN, OUT, KEY, TimeWindow> function,
+                       WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> 
function,
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        TypeSerializer<IN> valueSerializer,
@@ -52,7 +52,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
        @Override
        protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
                @SuppressWarnings("unchecked")
-               WindowFunction<IN, OUT, KEY, Window> windowFunction = 
(WindowFunction<IN, OUT, KEY, Window>) function;
+               WindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = 
(WindowFunction<Iterable<IN>, OUT, KEY, Window>) function;
                
                return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 1bb451a..73972e6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -47,7 +47,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W 
extends Window> extends N
        public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
                        TypeSerializer<W> windowSerializer,
                        WindowBufferFactory<? super IN, ? extends 
EvictingWindowBuffer<IN>> windowBufferFactory,
-                       AllWindowFunction<IN, OUT, W> windowFunction,
+                       AllWindowFunction<Iterable<IN>, OUT, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger,
                        Evictor<? super IN, ? super W> evictor) {
                super(windowAssigner, windowSerializer, windowBufferFactory, 
windowFunction, trigger);

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index ad43812..f163de1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -18,15 +18,22 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
 
 import static java.util.Objects.requireNonNull;
 
@@ -42,42 +49,97 @@ import static java.util.Objects.requireNonNull;
  * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends 
WindowOperator<K, IN, OUT, W> {
+public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends 
WindowOperator<K, IN, Iterable<IN>, OUT, W> {
 
        private static final long serialVersionUID = 1L;
 
        private final Evictor<? super IN, ? super W> evictor;
 
+       private final StateDescriptor<? extends ListState<StreamRecord<IN>>> 
windowStateDescriptor;
+
        public EvictingWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
-                       TypeSerializer<W> windowSerializer,
-                       KeySelector<IN, K> keySelector,
-                       TypeSerializer<K> keySerializer,
-                       WindowBufferFactory<? super IN, ? extends 
EvictingWindowBuffer<IN>> windowBufferFactory,
-                       WindowFunction<IN, OUT, K, W> windowFunction,
-                       Trigger<? super IN, ? super W> trigger,
-                       Evictor<? super IN, ? super W> evictor) {
-               super(windowAssigner, windowSerializer, keySelector, 
keySerializer, windowBufferFactory, windowFunction, trigger);
+               TypeSerializer<W> windowSerializer,
+               KeySelector<IN, K> keySelector,
+               TypeSerializer<K> keySerializer,
+               StateDescriptor<? extends ListState<StreamRecord<IN>>> 
windowStateDescriptor,
+               WindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
+               Trigger<? super IN, ? super W> trigger,
+               Evictor<? super IN, ? super W> evictor) {
+               super(windowAssigner, windowSerializer, keySelector, 
keySerializer, null, windowFunction, trigger);
                this.evictor = requireNonNull(evictor);
+               this.windowStateDescriptor = windowStateDescriptor;
        }
 
+
        @Override
-       @SuppressWarnings("unchecked, rawtypes")
-       protected void emitWindow(Context context) throws Exception {
-               
timestampedCollector.setTimestamp(context.window.maxTimestamp());
-               EvictingWindowBuffer<IN> windowBuffer = 
(EvictingWindowBuffer<IN>) context.windowBuffer;
-
-               int toEvict = 0;
-               if (windowBuffer.size() > 0) {
-                       // need some type trickery here...
-                       toEvict = evictor.evict((Iterable) 
windowBuffer.getElements(), windowBuffer.size(), context.window);
+       @SuppressWarnings("unchecked")
+       public final void processElement(StreamRecord<IN> element) throws 
Exception {
+               Collection<W> elementWindows = 
windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+
+               K key = (K) getStateBackend().getCurrentKey();
+
+               for (W window: elementWindows) {
+
+                       ListState<StreamRecord<IN>> windowState = 
getPartitionedState(window, windowSerializer,
+                               windowStateDescriptor);
+
+                       windowState.add(element);
+
+                       context.key = key;
+                       context.window = window;
+                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
+
+                       processTriggerResult(triggerResult, key, window);
                }
+       }
 
-               windowBuffer.removeElements(toEvict);
+       @Override
+       @SuppressWarnings("unchecked,rawtypes")
+       protected void processTriggerResult(Trigger.TriggerResult 
triggerResult, K key, W window) throws Exception {
+               if (!triggerResult.isFire() && !triggerResult.isPurge()) {
+                       // do nothing
+                       return;
+               }
 
-               userFunction.apply(context.key,
-                               context.window,
-                               context.windowBuffer.getUnpackedElements(),
-                               timestampedCollector);
+               if (triggerResult.isFire()) {
+                       
timestampedCollector.setTimestamp(window.maxTimestamp());
+
+                       setKeyContext(key);
+
+                       ListState<StreamRecord<IN>> windowState = 
getPartitionedState(window, windowSerializer,
+                               windowStateDescriptor);
+
+                       Iterable<StreamRecord<IN>> contents = windowState.get();
+
+                       // Work around type system restrictions...
+                       int toEvict = evictor.evict((Iterable) contents, 
Iterables.size(contents), context.window);
+
+                       FluentIterable<IN> projectedContents = FluentIterable
+                               .from(contents)
+                               .skip(toEvict)
+                               .transform(new Function<StreamRecord<IN>, IN>() 
{
+                                       @Override
+                                       public IN apply(StreamRecord<IN> input) 
{
+                                               return input.getValue();
+                                       }
+                               });
+                       userFunction.apply(context.key, context.window, 
projectedContents, timestampedCollector);
+
+                       if (triggerResult.isPurge()) {
+                               windowState.clear();
+                       } else {
+                               // we have to clear the state and set the 
elements that remain after eviction
+                               windowState.clear();
+                               for (StreamRecord<IN> rec: 
FluentIterable.from(contents).skip(toEvict)) {
+                                       windowState.add(rec);
+                               }
+                       }
+               } else if (triggerResult.isPurge()) {
+                       setKeyContext(key);
+                       ListState<StreamRecord<IN>> windowState = 
getPartitionedState(window, windowSerializer,
+                               windowStateDescriptor);
+                       windowState.clear();
+               }
        }
 
        @Override
@@ -95,4 +157,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
        public Evictor<? super IN, ? super W> getEvictor() {
                return evictor;
        }
+
+       @Override
+       @VisibleForTesting
+       @SuppressWarnings("unchecked, rawtypes")
+       public StateDescriptor<? extends MergingState<IN, Iterable<IN>>> 
getStateDescriptor() {
+               return (StateDescriptor<? extends MergingState<IN, 
Iterable<IN>>>) windowStateDescriptor;
+       }
 }

Reply via email to