[FLINK-1176] [streaming] WindowedDataStream rework for new windowing runtime


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c560d76f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c560d76f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c560d76f

Branch: refs/heads/master
Commit: c560d76fec7b0e389430fc418d83d8f38f5eb06f
Parents: 1146f64
Author: Gyula Fora <gyf...@apache.org>
Authored: Wed Feb 4 17:27:34 2015 +0100
Committer: mbalassi <mbala...@apache.org>
Committed: Mon Feb 16 13:06:07 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       | 217 +++++++
 .../api/datastream/WindowedDataStream.java      | 266 +++++----
 .../operator/GroupedWindowInvokable.java        | 506 ----------------
 .../operator/WindowGroupReduceInvokable.java    |  51 --
 .../api/invokable/operator/WindowInvokable.java | 382 ------------
 .../operator/WindowReduceInvokable.java         |  59 --
 .../operator/windowing/WindowMerger.java        |   4 +-
 .../policy/CloneableEvictionPolicy.java         |   1 -
 .../policy/CloneableTriggerPolicy.java          |   1 -
 .../operator/GroupedWindowInvokableTest.java    | 574 -------------------
 .../invokable/operator/WindowInvokableTest.java | 261 ---------
 .../windowing/GroupedWindowInvokableTest.java   | 574 +++++++++++++++++++
 .../operator/windowing/WindowInvokableTest.java | 261 +++++++++
 .../ml/IncrementalLearningSkeleton.java         |   2 +-
 .../socket/SocketTextStreamWordCount.java       |  27 +-
 .../windowing/TopSpeedWindowingExample.java     |   2 +-
 .../scala/examples/join/WindowJoin.scala        |  45 +-
 .../examples/windowing/TopSpeedWindowing.scala  |   1 +
 .../api/scala/WindowedDataStream.scala          |  12 +-
 19 files changed, 1238 insertions(+), 2008 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
new file mode 100644
index 0000000..37fa1e1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
+
+/**
+ * A {@link DiscretizedStream} represents a data stream that has been divided
+ * into windows (predefined chunks). User defined function such as
+ * {@link #reduceWindow(ReduceFunction)},
+ * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to 
the
+ * windows.
+ * 
+ * @param <OUT>
+ *            The output type of the {@link DiscretizedStream}
+ */
+public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
+
+       protected SingleOutputStreamOperator<StreamWindow<OUT>, ?> 
discretizedStream;
+
+       protected 
DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> 
discretizedStream,
+                       KeySelector<OUT, ?> groupByKey) {
+               super();
+               this.groupByKey = groupByKey;
+               this.discretizedStream = discretizedStream;
+       }
+
+       /**
+        * Applies a reduce transformation on the windowed data stream by 
reducing
+        * the current window at every trigger.The user can also extend the
+        * {@link RichReduceFunction} to gain access to other features provided 
by
+        * the {@link org.apache.flink.api.common.functions.RichFunction} 
interface.
+        * 
+        * @param reduceFunction
+        *            The reduce function that will be applied to the windows.
+        * @return The transformed DataStream
+        */
+       public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> 
reduceFunction) {
+
+               DiscretizedStream<OUT> out = partition(false).transform("Window 
Reduce", getType(),
+                               new WindowReducer<OUT>(reduceFunction)).merge();
+
+               if (!isGrouped()) {
+                       return out.transform("Window Reduce", out.getType(), 
new WindowReducer<OUT>(
+                                       reduceFunction));
+               } else {
+                       return out;
+               }
+       }
+
+       /**
+        * Applies a reduceGroup transformation on the windowed data stream by
+        * reducing the current window at every trigger. In contrast with the
+        * standard binary reducer, with reduceGroup the user can access all
+        * elements of the window at the same time through the iterable 
interface.
+        * The user can also extend the {@link RichGroupReduceFunction} to gain
+        * access to other features provided by the
+        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        * 
+        * @param reduceFunction
+        *            The reduce function that will be applied to the windows.
+        * @return The transformed DataStream
+        */
+       public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> 
reduceFunction) {
+
+               TypeInformation<R> retType = 
TypeExtractor.getGroupReduceReturnTypes(reduceFunction,
+                               getType());
+
+               DiscretizedStream<R> out = partition(true).transform("Window 
Reduce", retType,
+                               new WindowMapper<OUT, R>(reduceFunction));
+
+               if (isGrouped()) {
+                       return out.merge();
+               } else {
+                       return out;
+               }
+
+       }
+
+       private <R> DiscretizedStream<R> transform(String operatorName, 
TypeInformation<R> retType,
+                       StreamInvokable<StreamWindow<OUT>, StreamWindow<R>> 
invokable) {
+
+               return wrap(discretizedStream.transform(operatorName, new 
StreamWindowTypeInfo<R>(retType),
+                               invokable));
+       }
+
+       private DiscretizedStream<OUT> partition(boolean isMap) {
+
+               int parallelism = discretizedStream.getParallelism();
+
+               if (isGrouped()) {
+                       DiscretizedStream<OUT> out = transform("Window 
partitioner", getType(),
+                                       new 
WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism);
+
+                       out.groupByKey = null;
+
+                       return out;
+               } else if (!isMap) {
+                       return transform(
+                                       "Window partitioner",
+                                       getType(),
+                                       new 
WindowPartitioner<OUT>(discretizedStream.environment
+                                                       
.getDegreeOfParallelism())).setParallelism(parallelism);
+               } else {
+                       return this;
+               }
+       }
+
+       private DiscretizedStream<OUT> setParallelism(int parallelism) {
+               return wrap(discretizedStream.setParallelism(parallelism));
+       }
+
+       private DiscretizedStream<OUT> merge() {
+               TypeInformation<StreamWindow<OUT>> type = 
discretizedStream.getType();
+
+               return wrap(discretizedStream.groupBy(new 
WindowKey<OUT>()).transform("Window Merger",
+                               type, new WindowMerger<OUT>()));
+       }
+
+       public DataStream<OUT> flatten() {
+               return discretizedStream.transform("Window Flatten", getType(), 
new WindowFlattener<OUT>());
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator 
stream) {
+               return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) 
this.groupByKey);
+       }
+
+       public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+               return discretizedStream;
+       }
+
+       @SuppressWarnings("rawtypes")
+       protected Class<?> getClassAtPos(int pos) {
+               Class<?> type;
+               TypeInformation<OUT> outTypeInfo = getType();
+               if (outTypeInfo.isTupleType()) {
+                       type = ((TupleTypeInfo) 
outTypeInfo).getTypeAt(pos).getTypeClass();
+
+               } else if (outTypeInfo instanceof BasicArrayTypeInfo) {
+
+                       type = ((BasicArrayTypeInfo) 
outTypeInfo).getComponentTypeClass();
+
+               } else if (outTypeInfo instanceof PrimitiveArrayTypeInfo) {
+                       Class<?> clazz = outTypeInfo.getTypeClass();
+                       if (clazz == boolean[].class) {
+                               type = Boolean.class;
+                       } else if (clazz == short[].class) {
+                               type = Short.class;
+                       } else if (clazz == int[].class) {
+                               type = Integer.class;
+                       } else if (clazz == long[].class) {
+                               type = Long.class;
+                       } else if (clazz == float[].class) {
+                               type = Float.class;
+                       } else if (clazz == double[].class) {
+                               type = Double.class;
+                       } else if (clazz == char[].class) {
+                               type = Character.class;
+                       } else {
+                               throw new IndexOutOfBoundsException("Type could 
not be determined for array");
+                       }
+
+               } else if (pos == 0) {
+                       type = outTypeInfo.getTypeClass();
+               } else {
+                       throw new IndexOutOfBoundsException("Position is out of 
range");
+               }
+               return type;
+       }
+
+       /**
+        * Gets the output type.
+        * 
+        * @return The output type.
+        */
+       public TypeInformation<OUT> getType() {
+               return ((StreamWindowTypeInfo<OUT>) 
discretizedStream.getType()).getInnerType();
+       }
+
+       protected DiscretizedStream<OUT> copy() {
+               return new DiscretizedStream<OUT>(discretizedStream.copy(), 
groupByKey);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 4fe356b..fc117a1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -25,17 +25,20 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
 import 
org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import 
org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
-import 
org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -44,12 +47,14 @@ import 
org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 
 /**
  * A {@link WindowedDataStream} represents a data stream that has been divided
  * into windows (predefined chunks). User defined function such as
- * {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)} 
or
- * aggregations can be applied to the windows.
+ * {@link #reduceWindow(ReduceFunction)},
+ * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to 
the
+ * windows.
  * 
  * @param <OUT>
  *            The output type of the {@link WindowedDataStream}
@@ -57,9 +62,12 @@ import 
org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 public class WindowedDataStream<OUT> {
 
        protected DataStream<OUT> dataStream;
-       protected boolean isGrouped;
-       protected boolean allCentral;
-       protected KeySelector<OUT, ?> keySelector;
+
+       protected boolean isLocal = false;
+       protected boolean isCentral = true;
+
+       protected KeySelector<OUT, ?> discretizerKey;
+       protected KeySelector<OUT, ?> groupByKey;
 
        protected List<WindowingHelper<OUT>> triggerHelpers;
        protected List<WindowingHelper<OUT>> evictionHelpers;
@@ -67,6 +75,10 @@ public class WindowedDataStream<OUT> {
        protected LinkedList<TriggerPolicy<OUT>> userTriggers;
        protected LinkedList<EvictionPolicy<OUT>> userEvicters;
 
+       protected WindowedDataStream() {
+
+       }
+
        protected WindowedDataStream(DataStream<OUT> dataStream, 
WindowingHelper<OUT>... policyHelpers) {
                this.dataStream = dataStream.copy();
                this.triggerHelpers = new ArrayList<WindowingHelper<OUT>>();
@@ -75,15 +87,9 @@ public class WindowedDataStream<OUT> {
                }
 
                if (dataStream instanceof GroupedDataStream) {
-                       this.isGrouped = true;
-                       this.keySelector = ((GroupedDataStream<OUT>) 
dataStream).keySelector;
+                       this.discretizerKey = ((GroupedDataStream<OUT>) 
dataStream).keySelector;
                        // set all policies distributed
-                       this.allCentral = false;
-
-               } else {
-                       this.isGrouped = false;
-                       // set all policies central
-                       this.allCentral = true;
+                       this.isCentral = false;
                }
        }
 
@@ -102,27 +108,23 @@ public class WindowedDataStream<OUT> {
                }
 
                if (dataStream instanceof GroupedDataStream) {
-                       this.isGrouped = true;
-                       this.keySelector = ((GroupedDataStream<OUT>) 
dataStream).keySelector;
+                       this.discretizerKey = ((GroupedDataStream<OUT>) 
dataStream).keySelector;
                        // set all policies distributed
-                       this.allCentral = false;
+                       this.isCentral = false;
 
-               } else {
-                       this.isGrouped = false;
-                       // set all policies central
-                       this.allCentral = true;
                }
        }
 
        protected WindowedDataStream(WindowedDataStream<OUT> 
windowedDataStream) {
                this.dataStream = windowedDataStream.dataStream.copy();
-               this.isGrouped = windowedDataStream.isGrouped;
-               this.keySelector = windowedDataStream.keySelector;
+               this.discretizerKey = windowedDataStream.discretizerKey;
+               this.groupByKey = windowedDataStream.groupByKey;
                this.triggerHelpers = windowedDataStream.triggerHelpers;
                this.evictionHelpers = windowedDataStream.evictionHelpers;
                this.userTriggers = windowedDataStream.userTriggers;
                this.userEvicters = windowedDataStream.userEvicters;
-               this.allCentral = windowedDataStream.allCentral;
+               this.isCentral = windowedDataStream.isCentral;
+               this.isLocal = windowedDataStream.isLocal;
        }
 
        public <F> F clean(F f) {
@@ -169,11 +171,11 @@ public class WindowedDataStream<OUT> {
         * @return The grouped {@link WindowedDataStream}
         */
        public WindowedDataStream<OUT> groupBy(int... fields) {
-               WindowedDataStream<OUT> ret = this.copy();
-               ret.dataStream = ret.dataStream.groupBy(fields);
-               ret.isGrouped = true;
-               ret.keySelector = ((GroupedDataStream<OUT>) 
ret.dataStream).keySelector;
-               return ret;
+               if (getType() instanceof BasicArrayTypeInfo || getType() 
instanceof PrimitiveArrayTypeInfo) {
+                       return groupBy(new 
KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+               } else {
+                       return groupBy(new Keys.ExpressionKeys<OUT>(fields, 
getType()));
+               }
        }
 
        /**
@@ -193,11 +195,7 @@ public class WindowedDataStream<OUT> {
         * @return The grouped {@link WindowedDataStream}
         */
        public WindowedDataStream<OUT> groupBy(String... fields) {
-               WindowedDataStream<OUT> ret = this.copy();
-               ret.dataStream = ret.dataStream.groupBy(fields);
-               ret.isGrouped = true;
-               ret.keySelector = ((GroupedDataStream<OUT>) 
ret.dataStream).keySelector;
-               return ret;
+               return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
        }
 
        /**
@@ -214,12 +212,48 @@ public class WindowedDataStream<OUT> {
         */
        public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) 
{
                WindowedDataStream<OUT> ret = this.copy();
-               ret.dataStream = ret.dataStream.groupBy(keySelector);
-               ret.isGrouped = true;
-               ret.keySelector = ((GroupedDataStream<OUT>) 
ret.dataStream).keySelector;
+               ret.groupByKey = keySelector;
                return ret;
        }
 
+       private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) {
+               return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, 
getType())));
+       }
+
+       /**
+        * Sets the windowed computations local, so that the windowing and 
reduce or
+        * aggregation logic will be computed for each parallel instance of this
+        * operator
+        * 
+        * @return The local windowed data stream
+        */
+       public WindowedDataStream<OUT> local() {
+               WindowedDataStream<OUT> out = copy();
+               out.isLocal = true;
+               return out;
+       }
+
+       private DiscretizedStream<OUT> discretize(boolean isMap) {
+
+               StreamInvokable<OUT, StreamWindow<OUT>> discretizer;
+
+               if (discretizerKey == null) {
+                       discretizer = new StreamDiscretizer<OUT>(getTriggers(), 
getEvicters());
+               } else {
+                       discretizer = new 
GroupedStreamDiscretizer<OUT>(discretizerKey,
+                                       getDistributedTriggers(), 
getDistributedEvicters(), getCentralTriggers(),
+                                       getCentralEvicters());
+               }
+
+               int parallelism = isLocal || (discretizerKey != null) ? 
dataStream.environment
+                               .getDegreeOfParallelism() : 1;
+
+               return new DiscretizedStream<OUT>(dataStream.transform("Stream 
Discretizer",
+                               new StreamWindowTypeInfo<OUT>(getType()), 
discretizer).setParallelism(parallelism),
+                               groupByKey);
+
+       }
+
        /**
         * Applies a reduce transformation on the windowed data stream by 
reducing
         * the current window at every trigger.The user can also extend the
@@ -230,9 +264,9 @@ public class WindowedDataStream<OUT> {
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reduceFunction) {
-               return dataStream.transform("Window-Reduce", getType(),
-                               getReduceInvokable(reduceFunction));
+       public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> 
reduceFunction) {
+
+               return discretize(false).reduceWindow(reduceFunction);
        }
 
        /**
@@ -248,15 +282,12 @@ public class WindowedDataStream<OUT> {
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
-                       GroupReduceFunction<OUT, R> reduceFunction) {
-
-               TypeInformation<OUT> inType = getType();
-               TypeInformation<R> outType = TypeExtractor
-                               .getGroupReduceReturnTypes(reduceFunction, 
inType);
+       public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> 
reduceFunction) {
+               return discretize(true).mapWindow(reduceFunction);
+       }
 
-               return dataStream.transform("WindowReduce", outType,
-                               getReduceGroupInvokable(reduceFunction));
+       public DataStream<OUT> flatten() {
+               return dataStream;
        }
 
        /**
@@ -269,18 +300,20 @@ public class WindowedDataStream<OUT> {
         * {@link org.apache.flink.api.common.functions.RichFunction} interface.
         * </br> </br> This version of reduceGroup uses user supplied
         * typeinformation for serializaton. Use this only when the system is 
unable
-        * to detect type information using:
-        * {@link #reduceGroup(GroupReduceFunction)}
+        * to detect type information using: {@link 
#mapWindow(GroupReduceFunction)}
         * 
         * @param reduceFunction
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
+       public <R> SingleOutputStreamOperator<R, ?> mapWindow(
                        GroupReduceFunction<OUT, R> reduceFunction, 
TypeInformation<R> outType) {
 
-               return dataStream.transform("Window-Reduce", outType,
-                               getReduceGroupInvokable(reduceFunction));
+               throw new RuntimeException("Not implemented yet");
+       }
+
+       protected Class<?> getClassAtPos(int pos) {
+               return dataStream.getClassAtPos(pos);
        }
 
        /**
@@ -291,10 +324,9 @@ public class WindowedDataStream<OUT> {
         *            The position in the tuple/array to sum
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
-               dataStream.checkFieldRange(positionToSum);
+       public WindowedDataStream<OUT> sum(int positionToSum) {
                return aggregate((AggregationFunction<OUT>) 
SumAggregator.getSumFunction(positionToSum,
-                               dataStream.getClassAtPos(positionToSum), 
getType()));
+                               getClassAtPos(positionToSum), getType()));
        }
 
        /**
@@ -308,7 +340,7 @@ public class WindowedDataStream<OUT> {
         *            The field to sum
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> sum(String field) {
+       public WindowedDataStream<OUT> sum(String field) {
                return aggregate((AggregationFunction<OUT>) 
SumAggregator.getSumFunction(field, getType()));
        }
 
@@ -320,8 +352,7 @@ public class WindowedDataStream<OUT> {
         *            The position to minimize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
-               dataStream.checkFieldRange(positionToMin);
+       public WindowedDataStream<OUT> min(int positionToMin) {
                return 
aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
                                AggregationType.MIN));
        }
@@ -338,7 +369,7 @@ public class WindowedDataStream<OUT> {
         *            applied.
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> min(String field) {
+       public WindowedDataStream<OUT> min(String field) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(), AggregationType.MIN,
                                false));
        }
@@ -352,7 +383,7 @@ public class WindowedDataStream<OUT> {
         *            The position to minimize by
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+       public WindowedDataStream<OUT> minBy(int positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
 
@@ -365,7 +396,7 @@ public class WindowedDataStream<OUT> {
         *            The position to minimize by
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) 
{
+       public WindowedDataStream<OUT> minBy(String positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
 
@@ -382,8 +413,7 @@ public class WindowedDataStream<OUT> {
         *            minimum value, otherwise returns the last
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, 
boolean first) {
-               dataStream.checkFieldRange(positionToMinBy);
+       public WindowedDataStream<OUT> minBy(int positionToMinBy, boolean 
first) {
                return 
aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
                                AggregationType.MINBY, first));
        }
@@ -403,7 +433,7 @@ public class WindowedDataStream<OUT> {
         *            be returned
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean 
first) {
+       public WindowedDataStream<OUT> minBy(String field, boolean first) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(),
                                AggregationType.MINBY, first));
        }
@@ -416,8 +446,7 @@ public class WindowedDataStream<OUT> {
         *            The position to maximize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
-               dataStream.checkFieldRange(positionToMax);
+       public WindowedDataStream<OUT> max(int positionToMax) {
                return 
aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
                                AggregationType.MAX));
        }
@@ -434,7 +463,7 @@ public class WindowedDataStream<OUT> {
         *            applied.
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> max(String field) {
+       public WindowedDataStream<OUT> max(String field) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(), AggregationType.MAX,
                                false));
        }
@@ -448,7 +477,7 @@ public class WindowedDataStream<OUT> {
         *            The position to maximize by
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+       public WindowedDataStream<OUT> maxBy(int positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
 
@@ -461,7 +490,7 @@ public class WindowedDataStream<OUT> {
         *            The position to maximize by
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) 
{
+       public WindowedDataStream<OUT> maxBy(String positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
 
@@ -478,8 +507,7 @@ public class WindowedDataStream<OUT> {
         *            maximum value, otherwise returns the last
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, 
boolean first) {
-               dataStream.checkFieldRange(positionToMaxBy);
+       public WindowedDataStream<OUT> maxBy(int positionToMaxBy, boolean 
first) {
                return 
aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
                                AggregationType.MAXBY, first));
        }
@@ -499,21 +527,17 @@ public class WindowedDataStream<OUT> {
         *            be returned
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean 
first) {
+       public WindowedDataStream<OUT> maxBy(String field, boolean first) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(),
                                AggregationType.MAXBY, first));
        }
 
-       private SingleOutputStreamOperator<OUT, ?> 
aggregate(AggregationFunction<OUT> aggregator) {
-               StreamInvokable<OUT, OUT> invokable = 
getReduceInvokable(aggregator);
-
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
dataStream.transform("Window-Aggregation",
-                               getType(), invokable);
+       private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> 
aggregator) {
 
-               return returnStream;
+               return reduceWindow(aggregator);
        }
 
-       private LinkedList<TriggerPolicy<OUT>> getTriggers() {
+       protected LinkedList<TriggerPolicy<OUT>> getTriggers() {
 
                LinkedList<TriggerPolicy<OUT>> triggers = new 
LinkedList<TriggerPolicy<OUT>>();
 
@@ -531,7 +555,7 @@ public class WindowedDataStream<OUT> {
 
        }
 
-       private LinkedList<EvictionPolicy<OUT>> getEvicters() {
+       protected LinkedList<EvictionPolicy<OUT>> getEvicters() {
 
                LinkedList<EvictionPolicy<OUT>> evicters = new 
LinkedList<EvictionPolicy<OUT>>();
 
@@ -541,15 +565,15 @@ public class WindowedDataStream<OUT> {
                        }
                } else {
                        if (userEvicters == null) {
-                               boolean notOnlyTime=false;
-                               for (WindowingHelper<OUT> helper : 
triggerHelpers){
-                                       if (helper instanceof Time<?>){
+                               boolean notOnlyTime = false;
+                               for (WindowingHelper<OUT> helper : 
triggerHelpers) {
+                                       if (helper instanceof Time<?>) {
                                                evicters.add(helper.toEvict());
                                        } else {
-                                               notOnlyTime=true;
+                                               notOnlyTime = true;
                                        }
                                }
-                               if (notOnlyTime){
+                               if (notOnlyTime) {
                                        evicters.add(new 
TumblingEvictionPolicy<OUT>());
                                }
                        }
@@ -562,9 +586,9 @@ public class WindowedDataStream<OUT> {
                return evicters;
        }
 
-       private LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
+       protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
                LinkedList<TriggerPolicy<OUT>> cTriggers = new 
LinkedList<TriggerPolicy<OUT>>();
-               if (allCentral) {
+               if (isCentral) {
                        cTriggers.addAll(getTriggers());
                } else {
                        for (TriggerPolicy<OUT> trigger : getTriggers()) {
@@ -576,10 +600,10 @@ public class WindowedDataStream<OUT> {
                return cTriggers;
        }
 
-       private LinkedList<CloneableTriggerPolicy<OUT>> 
getDistributedTriggers() {
+       protected LinkedList<CloneableTriggerPolicy<OUT>> 
getDistributedTriggers() {
                LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = null;
 
-               if (!allCentral) {
+               if (!isCentral) {
                        dTriggers = new 
LinkedList<CloneableTriggerPolicy<OUT>>();
                        for (TriggerPolicy<OUT> trigger : getTriggers()) {
                                if (!(trigger instanceof TimeTriggerPolicy)) {
@@ -591,10 +615,10 @@ public class WindowedDataStream<OUT> {
                return dTriggers;
        }
 
-       private LinkedList<CloneableEvictionPolicy<OUT>> 
getDistributedEvicters() {
+       protected LinkedList<CloneableEvictionPolicy<OUT>> 
getDistributedEvicters() {
                LinkedList<CloneableEvictionPolicy<OUT>> evicters = null;
 
-               if (!allCentral) {
+               if (!isCentral) {
                        evicters = new 
LinkedList<CloneableEvictionPolicy<OUT>>();
                        for (EvictionPolicy<OUT> evicter : getEvicters()) {
                                evicters.add((CloneableEvictionPolicy<OUT>) 
evicter);
@@ -604,41 +628,14 @@ public class WindowedDataStream<OUT> {
                return evicters;
        }
 
-       private LinkedList<EvictionPolicy<OUT>> getCentralEvicters() {
-               if (allCentral) {
+       protected LinkedList<EvictionPolicy<OUT>> getCentralEvicters() {
+               if (isCentral) {
                        return getEvicters();
                } else {
                        return null;
                }
        }
 
-       private <R> StreamInvokable<OUT, R> 
getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) {
-               StreamInvokable<OUT, R> invokable;
-               if (isGrouped) {
-                       invokable = new GroupedWindowInvokable<OUT, 
R>(clean(reducer), keySelector,
-                                       getDistributedTriggers(), 
getDistributedEvicters(), getCentralTriggers(),
-                                       getCentralEvicters());
-
-               } else {
-                       invokable = new WindowGroupReduceInvokable<OUT, 
R>(clean(reducer), getTriggers(),
-                                       getEvicters());
-               }
-               return invokable;
-       }
-
-       private StreamInvokable<OUT, OUT> 
getReduceInvokable(ReduceFunction<OUT> reducer) {
-               StreamInvokable<OUT, OUT> invokable;
-               if (isGrouped) {
-                       invokable = new GroupedWindowInvokable<OUT, 
OUT>(clean(reducer), keySelector,
-                                       getDistributedTriggers(), 
getDistributedEvicters(), getCentralTriggers(),
-                                       getCentralEvicters());
-
-               } else {
-                       invokable = new 
WindowReduceInvokable<OUT>(clean(reducer), getTriggers(), getEvicters());
-               }
-               return invokable;
-       }
-
        /**
         * Gets the output type.
         * 
@@ -648,11 +645,30 @@ public class WindowedDataStream<OUT> {
                return dataStream.getType();
        }
 
-       public DataStream<OUT> getDataStream() {
+       protected DataStream<OUT> getDataStream() {
                return dataStream;
        }
 
        protected WindowedDataStream<OUT> copy() {
                return new WindowedDataStream<OUT>(this);
        }
+
+       protected boolean isGrouped() {
+               return groupByKey != null;
+       }
+
+       public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+               return discretize(true).getDiscretizedStream();
+       }
+
+       protected static class WindowKey<R> implements 
KeySelector<StreamWindow<R>, Integer> {
+
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public Integer getKey(StreamWindow<R> value) throws Exception {
+                       return value.windowID;
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
deleted file mode 100644
index df2edd2..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This invokable allows windowing based on {@link TriggerPolicy} and
- * {@link EvictionPolicy} instances including their active and cloneable
- * versions. It is additionally aware of the creation of windows per group.
- * 
- * A {@link KeySelector} is used to specify the key position or key extraction.
- * The {@link ReduceFunction} will be executed on each group separately.
- * Policies might either be centralized or distributed. It is not possible to
- * use central and distributed eviction policies at the same time. A 
distributed
- * policy have to be a {@link CloneableTriggerPolicy} or
- * {@link CloneableEvictionPolicy} as it will be cloned to have separated
- * instances for each group. At the startup time the distributed policies will
- * be stored as sample, and only clones of them will be used to maintain the
- * groups. Therefore, each group starts with the initial policy states.
- * 
- * While a distributed policy only gets notified with the elements belonging to
- * the respective group, a centralized policy get notified with all arriving
- * elements. When a centralized trigger occurred, all groups get triggered. 
This
- * is done by submitting the element which caused the trigger as real element 
to
- * the groups it belongs to and as fake element to all other groups. Within the
- * groups the element might be further processed, causing more triggers,
- * prenotifications of active distributed policies and evictions like usual.
- * 
- * Central policies can be instance of {@link ActiveTriggerPolicy} and also
- * implement the
- * {@link 
ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
- * method. Fake elements created on prenotification will be forwarded to all
- * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
- * it forwards/distributed calls all groups.
- * 
- * @param <IN>
- *            The type of input elements handled by this operator invokable.
- */
-public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
-
-       /**
-        * Auto-generated serial version UID
-        */
-       private static final long serialVersionUID = -3469545957144404137L;
-
-       private KeySelector<IN, ?> keySelector;
-       private Configuration parameters;
-       private LinkedList<ActiveTriggerPolicy<IN>> 
activeCentralTriggerPolicies;
-       private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies;
-       private LinkedList<ActiveEvictionPolicy<IN>> 
activeCentralEvictionPolicies;
-       private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
-       private LinkedList<CloneableTriggerPolicy<IN>> 
distributedTriggerPolicies;
-       private LinkedList<CloneableEvictionPolicy<IN>> 
distributedEvictionPolicies;
-       private Map<Object, WindowInvokable<IN, OUT>> windowingGroups;
-       private LinkedList<Thread> activePolicyThreads;
-       private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
-       private LinkedList<WindowInvokable<IN, OUT>> 
deleteOrderForCentralEviction;
-
-       /**
-        * This constructor creates an instance of the grouped windowing 
invokable.
-        * 
-        * A {@link KeySelector} is used to specify the key position or key
-        * extraction. The {@link ReduceFunction} will be executed on each group
-        * separately. Policies might either be centralized or distributed. It 
is
-        * not possible to use central and distributed eviction policies at the 
same
-        * time. A distributed policy have to be a {@link 
CloneableTriggerPolicy} or
-        * {@link CloneableEvictionPolicy} as it will be cloned to have 
separated
-        * instances for each group. At the startup time the distributed 
policies
-        * will be stored as sample, and only clones of them will be used to
-        * maintain the groups. Therefore, each group starts with the initial 
policy
-        * states.
-        * 
-        * While a distributed policy only gets notified with the elements 
belonging
-        * to the respective group, a centralized policy get notified with all
-        * arriving elements. When a centralized trigger occurred, all groups 
get
-        * triggered. This is done by submitting the element which caused the
-        * trigger as real element to the groups it belongs to and as fake 
element
-        * to all other groups. Within the groups the element might be further
-        * processed, causing more triggers, prenotifications of active 
distributed
-        * policies and evictions like usual.
-        * 
-        * Central policies can be instance of {@link ActiveTriggerPolicy} and 
also
-        * implement the
-        * {@link 
ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
-        * method. Fake elements created on prenotification will be forwarded 
to all
-        * groups. The {@link ActiveTriggerCallback} is also implemented in a 
way,
-        * that it forwards/distributed calls all groups.
-        * 
-        * @param userFunction
-        *            The user defined function.
-        * @param keySelector
-        *            A key selector to extract the key for the groups from the
-        *            input data.
-        * @param distributedTriggerPolicies
-        *            Trigger policies to be distributed and maintained 
individually
-        *            within each group.
-        * @param distributedEvictionPolicies
-        *            Eviction policies to be distributed and maintained
-        *            individually within each group. Note that there cannot be
-        *            both, central and distributed eviction policies at the 
same
-        *            time.
-        * @param centralTriggerPolicies
-        *            Trigger policies which will only exist once at a central
-        *            place. In case a central policy triggers, it will cause 
all
-        *            groups to be emitted. (Remark: Empty groups cannot be 
emitted.
-        *            If only one element is contained a group, this element 
itself
-        *            is returned as aggregated result.)
-        * @param centralEvictionPolicies
-        *            Eviction which will only exist once at a central place. 
Note
-        *            that there cannot be both, central and distributed 
eviction
-        *            policies at the same time. The central eviction policy 
will
-        *            work on an simulated element buffer containing all 
elements no
-        *            matter which group they belong to.
-        */
-       public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> 
keySelector,
-                       LinkedList<CloneableTriggerPolicy<IN>> 
distributedTriggerPolicies,
-                       LinkedList<CloneableEvictionPolicy<IN>> 
distributedEvictionPolicies,
-                       LinkedList<TriggerPolicy<IN>> centralTriggerPolicies,
-                       LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) 
{
-
-               super(userFunction);
-
-               this.keySelector = keySelector;
-
-               // handle the triggers
-               if (centralTriggerPolicies != null) {
-                       this.centralTriggerPolicies = centralTriggerPolicies;
-                       this.activeCentralTriggerPolicies = new 
LinkedList<ActiveTriggerPolicy<IN>>();
-
-                       for (TriggerPolicy<IN> trigger : 
centralTriggerPolicies) {
-                               if (trigger instanceof ActiveTriggerPolicy) {
-                                       
this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
-                               }
-                       }
-               } else {
-                       this.centralTriggerPolicies = new 
LinkedList<TriggerPolicy<IN>>();
-               }
-
-               if (distributedTriggerPolicies != null) {
-                       this.distributedTriggerPolicies = 
distributedTriggerPolicies;
-               } else {
-                       this.distributedTriggerPolicies = new 
LinkedList<CloneableTriggerPolicy<IN>>();
-               }
-
-               if (distributedEvictionPolicies != null) {
-                       this.distributedEvictionPolicies = 
distributedEvictionPolicies;
-               } else {
-                       this.distributedEvictionPolicies = new 
LinkedList<CloneableEvictionPolicy<IN>>();
-               }
-
-               this.activeCentralEvictionPolicies = new 
LinkedList<ActiveEvictionPolicy<IN>>();
-
-               if (centralEvictionPolicies != null) {
-                       this.centralEvictionPolicies = centralEvictionPolicies;
-
-                       for (EvictionPolicy<IN> eviction : 
centralEvictionPolicies) {
-                               if (eviction instanceof ActiveEvictionPolicy) {
-                                       
this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
-                               }
-                       }
-               } else {
-                       this.centralEvictionPolicies = new 
LinkedList<EvictionPolicy<IN>>();
-               }
-
-               this.windowingGroups = new HashMap<Object, WindowInvokable<IN, 
OUT>>();
-               this.activePolicyThreads = new LinkedList<Thread>();
-               this.currentTriggerPolicies = new 
LinkedList<TriggerPolicy<IN>>();
-               this.deleteOrderForCentralEviction = new 
LinkedList<WindowInvokable<IN, OUT>>();
-
-               // check that not both, central and distributed eviction, is 
used at the
-               // same time.
-               if (!this.centralEvictionPolicies.isEmpty() && 
!this.distributedEvictionPolicies.isEmpty()) {
-                       throw new UnsupportedOperationException(
-                                       "You can only use either central or 
distributed eviction policies but not both at the same time.");
-               }
-
-               // Check that there is at least one trigger and one eviction 
policy
-               if (this.centralEvictionPolicies.isEmpty() && 
this.distributedEvictionPolicies.isEmpty()) {
-                       throw new UnsupportedOperationException(
-                                       "You have to define at least one 
eviction policy");
-               }
-               if (this.centralTriggerPolicies.isEmpty() && 
this.distributedTriggerPolicies.isEmpty()) {
-                       throw new UnsupportedOperationException(
-                                       "You have to define at least one 
trigger policy");
-               }
-
-       }
-
-       @Override
-       public void invoke() throws Exception {
-               // Prevent empty data streams
-               if (readNext() == null) {
-                       throw new RuntimeException("DataStream must not be 
empty");
-               }
-
-               // Continuously run
-               while (nextRecord != null) {
-                       WindowInvokable<IN, OUT> groupInvokable = 
windowingGroups.get(keySelector
-                                       .getKey(nextRecord.getObject()));
-                       if (groupInvokable == null) {
-                               groupInvokable = makeNewGroup(nextRecord);
-                       }
-
-                       // Run the precalls for central active triggers
-                       for (ActiveTriggerPolicy<IN> trigger : 
activeCentralTriggerPolicies) {
-                               Object[] result = 
trigger.preNotifyTrigger(nextRecord.getObject());
-                               for (Object in : result) {
-
-                                       // If central eviction is used, handle 
it here
-                                       if 
(!activeCentralEvictionPolicies.isEmpty()) {
-                                               
evictElements(centralActiveEviction(in));
-                                       }
-
-                                       // process in groups
-                                       for (WindowInvokable<IN, OUT> group : 
windowingGroups.values()) {
-                                               group.processFakeElement(in, 
trigger);
-                                               checkForEmptyGroupBuffer(group);
-                                       }
-                               }
-                       }
-
-                       // Process non-active central triggers
-                       for (TriggerPolicy<IN> triggerPolicy : 
centralTriggerPolicies) {
-                               if 
(triggerPolicy.notifyTrigger(nextRecord.getObject())) {
-                                       
currentTriggerPolicies.add(triggerPolicy);
-                               }
-                       }
-
-                       if (currentTriggerPolicies.isEmpty()) {
-
-                               // only add the element to its group
-                               
groupInvokable.processRealElement(nextRecord.getObject());
-                               checkForEmptyGroupBuffer(groupInvokable);
-
-                               // If central eviction is used, handle it here
-                               if (!centralEvictionPolicies.isEmpty()) {
-                                       
evictElements(centralEviction(nextRecord.getObject(), false));
-                                       
deleteOrderForCentralEviction.add(groupInvokable);
-                               }
-
-                       } else {
-
-                               // call user function for all groups
-                               for (WindowInvokable<IN, OUT> group : 
windowingGroups.values()) {
-                                       if (group == groupInvokable) {
-                                               // process real with 
initialized policies
-                                               
group.processRealElement(nextRecord.getObject(), currentTriggerPolicies);
-                                       } else {
-                                               // process like a fake but also 
initialized with
-                                               // policies
-                                               
group.externalTriggerFakeElement(nextRecord.getObject(),
-                                                               
currentTriggerPolicies);
-                                       }
-
-                                       // remove group in case it has an empty 
buffer
-                                       // checkForEmptyGroupBuffer(group);
-                               }
-
-                               // If central eviction is used, handle it here
-                               if (!centralEvictionPolicies.isEmpty()) {
-                                       
evictElements(centralEviction(nextRecord.getObject(), true));
-                                       
deleteOrderForCentralEviction.add(groupInvokable);
-                               }
-                       }
-
-                       // clear current trigger list
-                       currentTriggerPolicies.clear();
-
-                       // read next record
-                       readNext();
-               }
-
-               // Stop all remaining threads from policies
-               for (Thread t : activePolicyThreads) {
-                       t.interrupt();
-               }
-
-               // finally trigger the buffer.
-               for (WindowInvokable<IN, OUT> group : windowingGroups.values()) 
{
-                       group.emitFinalWindow(centralTriggerPolicies);
-               }
-
-       }
-
-       /**
-        * This method creates a new group. The method gets called in case an
-        * element arrives which has a key which was not seen before. The method
-        * created a nested {@link WindowInvokable} and therefore created 
clones of
-        * all distributed trigger and eviction policies.
-        * 
-        * @param element
-        *            The element which leads to the generation of a new group
-        *            (previously unseen key)
-        * @throws Exception
-        *             In case the {@link KeySelector} throws an exception in
-        *             {@link KeySelector#getKey(Object)}, the exception is not
-        *             catched by this method.
-        */
-       @SuppressWarnings("unchecked")
-       private WindowInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) 
throws Exception {
-               // clone the policies
-               LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies 
= new LinkedList<TriggerPolicy<IN>>();
-               LinkedList<EvictionPolicy<IN>> 
clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
-               for (CloneableTriggerPolicy<IN> trigger : 
this.distributedTriggerPolicies) {
-                       clonedDistributedTriggerPolicies.add(trigger.clone());
-               }
-               for (CloneableEvictionPolicy<IN> eviction : 
this.distributedEvictionPolicies) {
-                       clonedDistributedEvictionPolicies.add(eviction.clone());
-               }
-
-               WindowInvokable<IN, OUT> groupInvokable;
-               if (userFunction instanceof ReduceFunction) {
-                       groupInvokable = (WindowInvokable<IN, OUT>) new 
WindowReduceInvokable<IN>(
-                                       (ReduceFunction<IN>) userFunction, 
clonedDistributedTriggerPolicies,
-                                       clonedDistributedEvictionPolicies);
-               } else {
-                       groupInvokable = new WindowGroupReduceInvokable<IN, 
OUT>(
-                                       (GroupReduceFunction<IN, OUT>) 
userFunction, clonedDistributedTriggerPolicies,
-                                       clonedDistributedEvictionPolicies);
-               }
-
-               groupInvokable.setup(taskContext, executionConfig);
-               groupInvokable.open(this.parameters);
-               windowingGroups.put(keySelector.getKey(element.getObject()), 
groupInvokable);
-
-               return groupInvokable;
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               this.parameters = parameters;
-               for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) 
{
-                       Runnable target = tp.createActiveTriggerRunnable(new 
WindowingCallback(tp));
-                       if (target != null) {
-                               Thread thread = new Thread(target);
-                               activePolicyThreads.add(thread);
-                               thread.start();
-                       }
-               }
-       };
-
-       /**
-        * This method is used to notify central eviction policies with a real
-        * element.
-        * 
-        * @param input
-        *            the real element to notify the eviction policy.
-        * @param triggered
-        *            whether a central trigger occurred or not.
-        * @return The number of elements to be deleted from the buffer.
-        */
-       private int centralEviction(IN input, boolean triggered) {
-               // Process the evictions and take care of double evictions
-               // In case there are multiple eviction policies present,
-               // only the one with the highest return value is recognized.
-               int currentMaxEviction = 0;
-               for (EvictionPolicy<IN> evictionPolicy : 
centralEvictionPolicies) {
-                       // use temporary variable to prevent multiple calls to
-                       // notifyEviction
-                       int tmp = evictionPolicy.notifyEviction(input, 
triggered,
-                                       deleteOrderForCentralEviction.size());
-                       if (tmp > currentMaxEviction) {
-                               currentMaxEviction = tmp;
-                       }
-               }
-               return currentMaxEviction;
-       }
-
-       /**
-        * This method is used to notify active central eviction policies with a
-        * fake element.
-        * 
-        * @param input
-        *            the fake element to notify the active central eviction
-        *            policies.
-        * @return The number of elements to be deleted from the buffer.
-        */
-       private int centralActiveEviction(Object input) {
-               // Process the evictions and take care of double evictions
-               // In case there are multiple eviction policies present,
-               // only the one with the highest return value is recognized.
-               int currentMaxEviction = 0;
-               for (ActiveEvictionPolicy<IN> evictionPolicy : 
activeCentralEvictionPolicies) {
-                       // use temporary variable to prevent multiple calls to
-                       // notifyEviction
-                       int tmp = 
evictionPolicy.notifyEvictionWithFakeElement(input,
-                                       deleteOrderForCentralEviction.size());
-                       if (tmp > currentMaxEviction) {
-                               currentMaxEviction = tmp;
-                       }
-               }
-               return currentMaxEviction;
-       }
-
-       /**
-        * This method is used in central eviction to delete a given number of
-        * elements from the buffer.
-        * 
-        * @param numToEvict
-        *            number of elements to delete from the virtual central 
element
-        *            buffer.
-        */
-       private void evictElements(int numToEvict) {
-               HashSet<WindowInvokable<IN, OUT>> usedGroups = new 
HashSet<WindowInvokable<IN, OUT>>();
-               for (; numToEvict > 0; numToEvict--) {
-                       WindowInvokable<IN, OUT> currentGroup = 
deleteOrderForCentralEviction.getFirst();
-                       // Do the eviction
-                       currentGroup.evictFirst();
-                       // Remember groups which possibly have an empty buffer 
after the
-                       // eviction
-                       usedGroups.add(currentGroup);
-                       try {
-                               deleteOrderForCentralEviction.removeFirst();
-                       } catch (NoSuchElementException e) {
-                               // when buffer is empty, ignore exception and 
stop deleting
-                               break;
-                       }
-
-               }
-
-               // Remove groups with empty buffer
-               for (WindowInvokable<IN, OUT> group : usedGroups) {
-                       checkForEmptyGroupBuffer(group);
-               }
-       }
-
-       /**
-        * Checks if the element buffer of a given windowing group is empty. If 
so,
-        * the group will be deleted.
-        * 
-        * @param group
-        *            The windowing group to be checked and and removed in case 
its
-        *            buffer is empty.
-        */
-       private void checkForEmptyGroupBuffer(WindowInvokable<IN, OUT> group) {
-               if (group.isBufferEmpty()) {
-                       windowingGroups.remove(group);
-               }
-       }
-
-       /**
-        * This callback class allows to handle the callbacks done by threads
-        * defined in active trigger policies
-        * 
-        * @see 
ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
-        */
-       private class WindowingCallback implements ActiveTriggerCallback {
-               private ActiveTriggerPolicy<IN> policy;
-
-               public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
-                       this.policy = policy;
-               }
-
-               @Override
-               public void sendFakeElement(Object datapoint) {
-
-                       // If central eviction is used, handle it here
-                       if (!centralEvictionPolicies.isEmpty()) {
-                               evictElements(centralActiveEviction(datapoint));
-                       }
-
-                       // handle element in groups
-                       for (WindowInvokable<IN, OUT> group : 
windowingGroups.values()) {
-                               group.processFakeElement(datapoint, policy);
-                               checkForEmptyGroupBuffer(group);
-                       }
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
deleted file mode 100644
index b3fdfe8..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-public class WindowGroupReduceInvokable<IN, OUT> extends WindowInvokable<IN, 
OUT> {
-
-       private static final long serialVersionUID = 1L;
-       GroupReduceFunction<IN, OUT> reducer;
-
-       public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> 
userFunction,
-                       LinkedList<TriggerPolicy<IN>> triggerPolicies,
-                       LinkedList<EvictionPolicy<IN>> evictionPolicies) {
-               super(userFunction, triggerPolicies, evictionPolicies);
-               this.reducer = userFunction;
-       }
-
-       @Override
-       protected void callUserFunction() throws Exception {
-               reducer.reduce(copyBuffer(), collector);
-       }
-
-       public LinkedList<IN> copyBuffer() {
-               LinkedList<IN> copy = new LinkedList<IN>();
-               for (IN element : buffer) {
-                       copy.add(copy(element));
-               }
-               return copy;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
deleted file mode 100644
index ea891c9..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, 
OUT> {
-
-       /**
-        * Auto-generated serial version UID
-        */
-       private static final long serialVersionUID = -8038984294071650730L;
-
-       private LinkedList<TriggerPolicy<IN>> triggerPolicies;
-       private LinkedList<EvictionPolicy<IN>> evictionPolicies;
-       private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
-       private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
-       private LinkedList<Thread> activePolicyTreads;
-       protected LinkedList<IN> buffer;
-       private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
-
-       /**
-        * This constructor created a windowing invokable using trigger and 
eviction
-        * policies.
-        * 
-        * @param userFunction
-        *            The user defined {@link ReduceFunction}
-        * @param triggerPolicies
-        *            A list of {@link TriggerPolicy}s and/or
-        *            {@link ActiveTriggerPolicy}s
-        * @param evictionPolicies
-        *            A list of {@link EvictionPolicy}s and/or
-        *            {@link ActiveEvictionPolicy}s
-        */
-       public WindowInvokable(Function userFunction, 
LinkedList<TriggerPolicy<IN>> triggerPolicies,
-                       LinkedList<EvictionPolicy<IN>> evictionPolicies) {
-               super(userFunction);
-
-               this.triggerPolicies = triggerPolicies;
-               this.evictionPolicies = evictionPolicies;
-
-               activeTriggerPolicies = new 
LinkedList<ActiveTriggerPolicy<IN>>();
-               for (TriggerPolicy<IN> tp : triggerPolicies) {
-                       if (tp instanceof ActiveTriggerPolicy) {
-                               
activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp);
-                       }
-               }
-
-               activeEvictionPolicies = new 
LinkedList<ActiveEvictionPolicy<IN>>();
-               for (EvictionPolicy<IN> ep : evictionPolicies) {
-                       if (ep instanceof ActiveEvictionPolicy) {
-                               
activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
-                       }
-               }
-
-               this.activePolicyTreads = new LinkedList<Thread>();
-               this.buffer = new LinkedList<IN>();
-               this.currentTriggerPolicies = new 
LinkedList<TriggerPolicy<IN>>();
-       }
-
-       @Override
-       public void open(org.apache.flink.configuration.Configuration 
parameters) throws Exception {
-               super.open(parameters);
-               for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
-                       Runnable target = tp.createActiveTriggerRunnable(new 
WindowingCallback(tp));
-                       if (target != null) {
-                               Thread thread = new Thread(target);
-                               activePolicyTreads.add(thread);
-                               thread.start();
-                       }
-               }
-       };
-
-       /**
-        * This class allows the active trigger threads to call back and push 
fake
-        * elements at any time.
-        */
-       private class WindowingCallback implements ActiveTriggerCallback {
-               private ActiveTriggerPolicy<IN> policy;
-
-               public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
-                       this.policy = policy;
-               }
-
-               @Override
-               public void sendFakeElement(Object datapoint) {
-                       processFakeElement(datapoint, this.policy);
-               }
-
-       }
-
-       @Override
-       public void invoke() throws Exception {
-
-               // Prevent empty data streams
-               if (readNext() == null) {
-                       throw new RuntimeException("DataStream must not be 
empty");
-               }
-
-               // Continuously run
-               while (nextRecord != null) {
-                       processRealElement(nextRecord.getObject());
-
-                       // Load next StreamRecord
-                       readNext();
-               }
-
-               // Stop all remaining threads from policies
-               for (Thread t : activePolicyTreads) {
-                       t.interrupt();
-               }
-
-               // finally trigger the buffer.
-               emitFinalWindow(null);
-
-       }
-
-       /**
-        * This method gets called in case of an grouped windowing in case 
central
-        * trigger occurred and the arriving element causing the trigger is not 
part
-        * of this group.
-        * 
-        * Remark: This is NOT the same as
-        * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}! 
Here
-        * the eviction using active policies takes place after the call to the 
UDF.
-        * Usually it is done before when fake elements get submitted. This 
special
-        * behaviour is needed to allow the {@link GroupedWindowInvokable} to 
send
-        * central triggers to all groups, even if the current element does not
-        * belong to the group.
-        * 
-        * @param input
-        *            a fake input element
-        * @param policies
-        *            the list of policies which caused the call with this fake
-        *            element
-        */
-       protected synchronized void externalTriggerFakeElement(IN input,
-                       List<TriggerPolicy<IN>> policies) {
-
-               // Set the current triggers
-               currentTriggerPolicies.addAll(policies);
-
-               // emit
-               callUserFunctionAndLogException();
-
-               // clear the flag collection
-               currentTriggerPolicies.clear();
-
-               // Process the evictions and take care of double evictions
-               // In case there are multiple eviction policies present,
-               // only the one with the highest return value is recognized.
-               int currentMaxEviction = 0;
-               for (ActiveEvictionPolicy<IN> evictionPolicy : 
activeEvictionPolicies) {
-                       // use temporary variable to prevent multiple calls to
-                       // notifyEviction
-                       int tmp = 
evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
-                       if (tmp > currentMaxEviction) {
-                               currentMaxEviction = tmp;
-                       }
-               }
-
-               for (int i = 0; i < currentMaxEviction; i++) {
-                       try {
-                               buffer.removeFirst();
-                       } catch (NoSuchElementException e) {
-                               // In case no more elements are in the buffer:
-                               // Prevent failure and stop deleting.
-                               break;
-                       }
-               }
-       }
-
-       /**
-        * This method processed an arrived fake element The method is 
synchronized
-        * to ensure that it cannot interleave with
-        * {@link WindowInvokable#processRealElement(Object)}
-        * 
-        * @param input
-        *            a fake input element
-        * @param currentPolicy
-        *            the policy which produced this fake element
-        */
-       protected synchronized void processFakeElement(Object input, 
TriggerPolicy<IN> currentPolicy) {
-
-               // Process the evictions and take care of double evictions
-               // In case there are multiple eviction policies present,
-               // only the one with the highest return value is recognized.
-               int currentMaxEviction = 0;
-               for (ActiveEvictionPolicy<IN> evictionPolicy : 
activeEvictionPolicies) {
-                       // use temporary variable to prevent multiple calls to
-                       // notifyEviction
-                       int tmp = 
evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
-                       if (tmp > currentMaxEviction) {
-                               currentMaxEviction = tmp;
-                       }
-               }
-
-               for (int i = 0; i < currentMaxEviction; i++) {
-                       try {
-                               buffer.removeFirst();
-                       } catch (NoSuchElementException e) {
-                               // In case no more elements are in the buffer:
-                               // Prevent failure and stop deleting.
-                               break;
-                       }
-               }
-
-               // Set the current trigger
-               currentTriggerPolicies.add(currentPolicy);
-
-               // emit
-               callUserFunctionAndLogException();
-
-               // clear the flag collection
-               currentTriggerPolicies.clear();
-       }
-
-       /**
-        * This method processed an arrived real element The method is 
synchronized
-        * to ensure that it cannot interleave with
-        * {@link WindowInvokable#processFakeElement(Object)}.
-        * 
-        * @param input
-        *            a real input element
-        * @param triggerPolicies
-        *            Allows to set trigger policies which are maintained
-        *            externally. This is the case for central policies in
-        *            {@link GroupedWindowInvokable}.
-        */
-       protected synchronized void processRealElement(IN input, 
List<TriggerPolicy<IN>> triggerPolicies) {
-               this.currentTriggerPolicies.addAll(triggerPolicies);
-               processRealElement(input);
-       }
-
-       /**
-        * This method processed an arrived real element The method is 
synchronized
-        * to ensure that it cannot interleave with
-        * {@link WindowInvokable#processFakeElement(Object)}
-        * 
-        * @param input
-        *            a real input element
-        */
-       protected synchronized void processRealElement(IN input) {
-
-               // Run the precalls to detect missed windows
-               for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
-                       // Remark: In case multiple active triggers are present 
the ordering
-                       // of the different fake elements returned by this 
triggers becomes
-                       // a problem. This might lead to unexpected results...
-                       // Should we limit the number of active triggers to 0 
or 1?
-                       Object[] result = trigger.preNotifyTrigger(input);
-                       for (Object in : result) {
-                               processFakeElement(in, trigger);
-                       }
-               }
-
-               // Remember if a trigger occurred
-               boolean isTriggered = false;
-
-               // Process the triggers
-               for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) {
-                       if (triggerPolicy.notifyTrigger(input)) {
-                               currentTriggerPolicies.add(triggerPolicy);
-                       }
-               }
-
-               // call user function
-               if (!currentTriggerPolicies.isEmpty()) {
-                       // emit
-                       callUserFunctionAndLogException();
-
-                       // clear the flag collection
-                       currentTriggerPolicies.clear();
-
-                       // remember trigger
-                       isTriggered = true;
-               }
-
-               // Process the evictions and take care of double evictions
-               // In case there are multiple eviction policies present,
-               // only the one with the highest return value is recognized.
-               int currentMaxEviction = 0;
-
-               for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) {
-                       // use temporary variable to prevent multiple calls to
-                       // notifyEviction
-                       int tmp = evictionPolicy.notifyEviction(input, 
isTriggered, buffer.size());
-                       if (tmp > currentMaxEviction) {
-                               currentMaxEviction = tmp;
-                       }
-               }
-
-               for (int i = 0; i < currentMaxEviction; i++) {
-                       try {
-                               buffer.removeFirst();
-                       } catch (NoSuchElementException e) {
-                               // In case no more elements are in the buffer:
-                               // Prevent failure and stop deleting.
-                               break;
-                       }
-               }
-
-               // Add the current element to the buffer
-               buffer.add(input);
-
-       }
-
-       /**
-        * This method removes the first element from the element buffer. It is 
used
-        * to provide central evictions in {@link GroupedWindowInvokable}
-        */
-       protected synchronized void evictFirst() {
-               try {
-                       buffer.removeFirst();
-               } catch (NoSuchElementException e) {
-                       // ignore exception
-               }
-       }
-
-       /**
-        * This method returns whether the element buffer is empty or not. It is
-        * used to figure out if a group can be deleted or not when
-        * {@link GroupedWindowInvokable} is used.
-        * 
-        * @return true in case the buffer is empty otherwise false.
-        */
-       protected boolean isBufferEmpty() {
-               return buffer.isEmpty();
-       }
-
-       /**
-        * This method does the final reduce at the end of the stream and emits 
the
-        * result.
-        * 
-        * @param centralTriggerPolicies
-        *            Allows to set trigger policies which are maintained
-        *            externally. This is the case for central policies in
-        *            {@link GroupedWindowInvokable}.
-        */
-       protected void emitFinalWindow(List<TriggerPolicy<IN>> 
centralTriggerPolicies) {
-               if (!buffer.isEmpty()) {
-                       currentTriggerPolicies.clear();
-
-                       if (centralTriggerPolicies != null) {
-                               
currentTriggerPolicies.addAll(centralTriggerPolicies);
-                       }
-
-                       for (TriggerPolicy<IN> policy : triggerPolicies) {
-                               currentTriggerPolicies.add(policy);
-                       }
-
-                       callUserFunctionAndLogException();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
deleted file mode 100644
index ed246c8..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-public class WindowReduceInvokable<IN> extends WindowInvokable<IN, IN> {
-
-       private static final long serialVersionUID = 1L;
-
-       ReduceFunction<IN> reducer;
-
-       public WindowReduceInvokable(ReduceFunction<IN> userFunction,
-                       LinkedList<TriggerPolicy<IN>> triggerPolicies,
-                       LinkedList<EvictionPolicy<IN>> evictionPolicies) {
-               super(userFunction, triggerPolicies, evictionPolicies);
-               this.reducer = userFunction;
-       }
-
-       @Override
-       protected void callUserFunction() throws Exception {
-               Iterator<IN> reducedIterator = buffer.iterator();
-               IN reduced = null;
-
-               while (reducedIterator.hasNext() && reduced == null) {
-                       reduced = reducedIterator.next();
-               }
-
-               while (reducedIterator.hasNext()) {
-                       IN next = reducedIterator.next();
-                       if (next != null) {
-                               reduced = reducer.reduce(copy(reduced), 
copy(next));
-                       }
-               }
-               if (reduced != null) {
-                       collector.collect(reduced);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
index 738c78f..1766b0b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -22,11 +22,11 @@ import java.util.Map;
 
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
-public class WindowCombiner<T> extends ChainableInvokable<StreamWindow<T>, 
StreamWindow<T>> {
+public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, 
StreamWindow<T>> {
 
        private Map<Integer, StreamWindow<T>> windows;
 
-       public WindowCombiner() {
+       public WindowMerger() {
                super(null);
                this.windows = new HashMap<Integer, StreamWindow<T>>();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
index 132b495..1937b3f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import 
org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 
 /**
  * When used in {@link GroupedWindowInvokable}, eviction policies must

http://git-wip-us.apache.org/repos/asf/flink/blob/c560d76f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
index f5772a1..6a04461 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import 
org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 
 /**
  * When used in {@link GroupedWindowInvokable}, trigger policies can provide

Reply via email to