[streaming] WindowMapFunction added + Streaming package structure cleanup

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb5dc7e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb5dc7e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb5dc7e3

Branch: refs/heads/master
Commit: bb5dc7e3dd6825f8f78f42df6c4ca5ef00d520fe
Parents: 3885823
Author: Gyula Fora <gyf...@apache.org>
Authored: Fri Feb 13 11:03:08 2015 +0100
Committer: mbalassi <mbala...@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java |   2 +-
 .../streaming/api/datastream/DataStream.java    |  16 +-
 .../api/datastream/DiscretizedStream.java       |  55 ++---
 .../api/datastream/WindowedDataStream.java      | 218 +++++++++----------
 .../temporaloperator/StreamCrossOperator.java   |  13 +-
 .../environment/StreamExecutionEnvironment.java |   1 +
 .../api/function/RichWindowMapFunction.java     |  31 +++
 .../api/function/WindowMapFunction.java         |  28 +++
 .../function/source/ParallelSourceFunction.java |  40 ++--
 .../operator/windowing/BasicWindowBuffer.java   |  74 -------
 .../windowing/CompletePreAggregator.java        |  23 --
 .../windowing/GroupedStreamDiscretizer.java     |   2 +
 .../windowing/GroupedTimeDiscretizer.java       |   1 +
 .../operator/windowing/StreamDiscretizer.java   |   2 +
 .../operator/windowing/StreamWindow.java        | 183 ----------------
 .../windowing/StreamWindowSerializer.java       | 138 ------------
 .../windowing/StreamWindowTypeInfo.java         |  74 -------
 .../windowing/TumblingGroupedPreReducer.java    |  95 --------
 .../operator/windowing/TumblingPreReducer.java  |  78 -------
 .../operator/windowing/WindowBuffer.java        |  36 ---
 .../operator/windowing/WindowFlattener.java     |   1 +
 .../operator/windowing/WindowMapper.java        |  22 +-
 .../operator/windowing/WindowMerger.java        |   1 +
 .../operator/windowing/WindowPartitioner.java   |   1 +
 .../operator/windowing/WindowReducer.java       |   1 +
 .../streaming/api/windowing/StreamWindow.java   | 181 +++++++++++++++
 .../api/windowing/StreamWindowSerializer.java   | 134 ++++++++++++
 .../api/windowing/StreamWindowTypeInfo.java     |  74 +++++++
 .../windowbuffer/BasicWindowBuffer.java         |  75 +++++++
 .../windowbuffer/CompletePreAggregator.java     |  23 ++
 .../windowbuffer/TumblingGroupedPreReducer.java |  96 ++++++++
 .../windowbuffer/TumblingPreReducer.java        |  79 +++++++
 .../windowing/windowbuffer/WindowBuffer.java    |  37 ++++
 .../api/invokable/operator/CoFlatMapTest.java   |  84 -------
 .../invokable/operator/CoGroupedReduceTest.java | 125 -----------
 .../api/invokable/operator/CoMapTest.java       |  57 -----
 .../invokable/operator/CoStreamReduceTest.java  |  71 ------
 .../api/invokable/operator/CoWindowTest.java    | 182 ----------------
 .../invokable/operator/co/CoFlatMapTest.java    |  84 +++++++
 .../operator/co/CoGroupedReduceTest.java        | 125 +++++++++++
 .../api/invokable/operator/co/CoMapTest.java    |  57 +++++
 .../operator/co/CoStreamReduceTest.java         |  71 ++++++
 .../api/invokable/operator/co/CoWindowTest.java | 182 ++++++++++++++++
 .../windowing/BasicWindowBufferTest.java        |  83 -------
 .../windowing/GroupedStreamDiscretizerTest.java |   2 +
 .../windowing/StreamDiscretizerTest.java        |   2 +
 .../operator/windowing/StreamWindowTest.java    | 201 -----------------
 .../TumblingGroupedPreReducerTest.java          | 124 -----------
 .../windowing/TumblingPreReducerTest.java       | 110 ----------
 .../windowing/WindowIntegrationTest.java        |   7 +-
 .../api/windowing/StreamWindowTest.java         | 201 +++++++++++++++++
 .../windowbuffer/BasicWindowBufferTest.java     |  86 ++++++++
 .../TumblingGroupedPreReducerTest.java          | 127 +++++++++++
 .../windowbuffer/TumblingPreReducerTest.java    | 113 ++++++++++
 .../ml/IncrementalLearningSkeleton.java         |   6 +-
 .../examples/windowing/StockPrices.java         |  30 +--
 .../api/scala/WindowedDataStream.scala          |  14 +-
 .../flink/streaming/api/scala/package.scala     |   2 +-
 58 files changed, 2032 insertions(+), 1949 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 1b3003f..63273f8 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -212,7 +212,7 @@ public class TypeExtractor {
        // 
--------------------------------------------------------------------------------------------
        
        @SuppressWarnings("unchecked")
-       private static <IN, OUT> TypeInformation<OUT> 
getUnaryOperatorReturnType(Function function, Class<?> baseClass, 
+       public static <IN, OUT> TypeInformation<OUT> 
getUnaryOperatorReturnType(Function function, Class<?> baseClass, 
                        boolean hasIterable, boolean hasCollector, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c24ab99..e766626 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -208,7 +208,7 @@ public class DataStream<OUT> {
                return environment;
        }
 
-       public ExecutionConfig getExecutionConfig() {
+       protected ExecutionConfig getExecutionConfig() {
                return environment.getConfig();
        }
 
@@ -254,11 +254,12 @@ public class DataStream<OUT> {
 
        /**
         * Creates a new {@link ConnectedDataStream} by connecting
-        * {@link DataStream} outputs of different type with each other. The
-        * DataStreams connected using this operators can be used with 
CoFunctions.
+        * {@link DataStream} outputs of (possible) different typea with each 
other.
+        * The DataStreams connected using this operator can be used with
+        * CoFunctions to apply joint transformations.
         * 
         * @param dataStream
-        *            The DataStream with which this stream will be joined.
+        *            The DataStream with which this stream will be connected.
         * @return The {@link ConnectedDataStream}.
         */
        public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> 
dataStream) {
@@ -502,9 +503,10 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies a reduce transformation on the data stream. The user can also
-        * extend the {@link RichReduceFunction} to gain access to other 
features
-        * provided by the
+        * Applies a reduce transformation on the data stream. The returned 
stream
+        * contains all the intermediate values of the reduce transformation. 
The
+        * user can also extend the {@link RichReduceFunction} to gain access to
+        * other features provided by the
         * {@link org.apache.flink.api.common.functions.RichFunction} interface.
         * 
         * @param reducer

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index c17169d..7ab7c2c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -18,9 +18,8 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -28,21 +27,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 
 /**
  * A {@link DiscretizedStream} represents a data stream that has been divided
  * into windows (predefined chunks). User defined function such as
- * {@link #reduceWindow(ReduceFunction)},
- * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to 
the
- * windows.
+ * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
+ * can be applied to the windows.
  * 
  * @param <OUT>
  *            The output type of the {@link DiscretizedStream}
@@ -60,6 +59,14 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
                this.transformation = tranformation;
        }
 
+       public DataStream<OUT> flatten() {
+               return discretizedStream.transform("Window Flatten", getType(), 
new WindowFlattener<OUT>());
+       }
+
+       public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+               return discretizedStream;
+       }
+
        /**
         * Applies a reduce transformation on the windowed data stream by 
reducing
         * the current window at every trigger.The user can also extend the
@@ -79,7 +86,7 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
 
                if (!isGrouped()) {
                        return out.transform(WindowTransformation.REDUCEWINDOW, 
"Window Reduce", out.getType(),
-                                       new WindowReducer<OUT>(reduceFunction));
+                                       new 
WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
                } else {
                        return out;
                }
@@ -90,29 +97,27 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
         * reducing the current window at every trigger. In contrast with the
         * standard binary reducer, with reduceGroup the user can access all
         * elements of the window at the same time through the iterable 
interface.
-        * The user can also extend the {@link RichGroupReduceFunction} to gain
-        * access to other features provided by the
-        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        * The user can also extend the to gain access to other features 
provided by
+        * the {@link org.apache.flink.api.common.functions.RichFunction} 
interface.
         * 
-        * @param reduceFunction
+        * @param windowMapFunction
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
        @Override
-       public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> 
reduceFunction) {
+       public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction) {
 
-               TypeInformation<R> retType = 
TypeExtractor.getGroupReduceReturnTypes(reduceFunction,
-                               getType());
+               TypeInformation<R> retType = 
getWindowMapReturnTypes(windowMapFunction, getType());
 
-               return mapWindow(reduceFunction, retType);
+               return mapWindow(windowMapFunction, retType);
        }
 
        @Override
-       public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> 
reduceFunction,
+       public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction,
                        TypeInformation<R> returnType) {
                DiscretizedStream<R> out = partition(transformation).transform(
                                WindowTransformation.REDUCEWINDOW, "Window 
Reduce", returnType,
-                               new WindowMapper<OUT, R>(reduceFunction));
+                               new WindowMapper<OUT, 
R>(discretizedStream.clean(windowMapFunction)));
 
                if (isGrouped()) {
                        return out.merge();
@@ -159,10 +164,6 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
                                type, new WindowMerger<OUT>()));
        }
 
-       public DataStream<OUT> flatten() {
-               return discretizedStream.transform("Window Flatten", getType(), 
new WindowFlattener<OUT>());
-       }
-
        @SuppressWarnings("rawtypes")
        private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator 
stream) {
                return wrap(stream, transformation);
@@ -174,10 +175,6 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
                return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) 
this.groupByKey, transformation);
        }
 
-       public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
-               return discretizedStream;
-       }
-
        @SuppressWarnings("rawtypes")
        protected Class<?> getClassAtPos(int pos) {
                Class<?> type;
@@ -231,6 +228,12 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
                return ((StreamWindowTypeInfo<OUT>) 
discretizedStream.getType()).getInnerType();
        }
 
+       private static <IN, OUT> TypeInformation<OUT> getWindowMapReturnTypes(
+                       WindowMapFunction<IN, OUT> windowMapInterface, 
TypeInformation<IN> inType) {
+               return TypeExtractor.getUnaryOperatorReturnType((Function) 
windowMapInterface,
+                               WindowMapFunction.class, true, true, inType, 
null, false);
+       }
+
        protected DiscretizedStream<OUT> copy() {
                return new DiscretizedStream<OUT>(discretizedStream.copy(), 
groupByKey, transformation);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 04a29a9..3ff5859 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -19,30 +19,24 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
 import 
org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.BasicWindowBuffer;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.CompletePreAggregator;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.GroupedTimeDiscretizer;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.TumblingGroupedPreReducer;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.TumblingPreReducer;
-import 
org.apache.flink.streaming.api.invokable.operator.windowing.WindowBuffer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -51,20 +45,36 @@ import 
org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
+import 
org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 
 /**
  * A {@link WindowedDataStream} represents a data stream that has been divided
  * into windows (predefined chunks). User defined function such as
- * {@link #reduceWindow(ReduceFunction)},
- * {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to 
the
- * windows.
+ * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
+ * can be applied to the windows.
  * 
  * @param <OUT>
  *            The output type of the {@link WindowedDataStream}
  */
 public class WindowedDataStream<OUT> {
 
+       protected enum WindowTransformation {
+
+               REDUCEWINDOW, MAPWINDOW, NONE;
+
+               private Function UDF;
+
+               public WindowTransformation with(Function UDF) {
+                       this.UDF = UDF;
+                       return this;
+               }
+       }
+
        protected DataStream<OUT> dataStream;
 
        protected boolean isLocal = false;
@@ -113,10 +123,6 @@ public class WindowedDataStream<OUT> {
        public WindowedDataStream() {
        }
 
-       public <F> F clean(F f) {
-               return dataStream.clean(f);
-       }
-
        /**
         * Defines the slide size (trigger frequency) for the windowed data 
stream.
         * This controls how often the user defined function will be triggered 
on
@@ -201,7 +207,7 @@ public class WindowedDataStream<OUT> {
        }
 
        private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) {
-               return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, 
getType(),
+               return 
groupBy(dataStream.clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
                                getExecutionConfig())));
        }
 
@@ -218,73 +224,13 @@ public class WindowedDataStream<OUT> {
                return out;
        }
 
-       private DiscretizedStream<OUT> discretize(WindowTransformation 
transformation,
-                       WindowBuffer<OUT> windowBuffer) {
-
-               StreamInvokable<OUT, StreamWindow<OUT>> discretizer = 
getDiscretizer(transformation,
-                               windowBuffer, getTrigger(), getEviction(), 
discretizerKey);
-
-               int parallelism = getDiscretizerParallelism();
-
-               return new DiscretizedStream<OUT>(dataStream.transform("Stream 
Discretizer",
-                               new StreamWindowTypeInfo<OUT>(getType()), 
discretizer).setParallelism(parallelism),
-                               groupByKey, transformation);
-
-       }
-
-       protected enum WindowTransformation {
-
-               REDUCEWINDOW, MAPWINDOW, NONE;
-
-               private Function UDF;
-
-               public WindowTransformation with(Function UDF) {
-                       this.UDF = UDF;
-                       return this;
-               }
-       }
-
-       private int getDiscretizerParallelism() {
-               return isLocal || (discretizerKey != null) ? 
dataStream.environment
-                               .getDegreeOfParallelism() : 1;
-       }
-
-       private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer(
-                       WindowTransformation transformation, WindowBuffer<OUT> 
windowBuffer,
-                       TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> 
eviction,
-                       KeySelector<OUT, ?> discretizerKey) {
-
-               if (discretizerKey == null) {
-                       return new StreamDiscretizer<OUT>(trigger, eviction, 
windowBuffer);
-               } else if (trigger instanceof TimeTriggerPolicy
-                               && ((TimeTriggerPolicy<OUT>) 
trigger).timestampWrapper.isDefaultTimestamp()) {
-                       return new GroupedTimeDiscretizer<OUT>(discretizerKey,
-                                       (TimeTriggerPolicy<OUT>) trigger, 
(CloneableEvictionPolicy<OUT>) eviction,
-                                       windowBuffer);
-               } else {
-                       return new GroupedStreamDiscretizer<OUT>(discretizerKey,
-                                       (CloneableTriggerPolicy<OUT>) trigger, 
(CloneableEvictionPolicy<OUT>) eviction,
-                                       windowBuffer);
-               }
-
+       public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+               return discretize(WindowTransformation.NONE, new 
BasicWindowBuffer<OUT>())
+                               .getDiscretizedStream();
        }
 
-       @SuppressWarnings("unchecked")
-       private WindowBuffer<OUT> getWindowBuffer(WindowTransformation 
transformation,
-                       TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> 
eviction,
-                       KeySelector<OUT, ?> discretizerKey) {
-
-               if (transformation == WindowTransformation.REDUCEWINDOW
-                               && eviction instanceof TumblingEvictionPolicy) {
-                       if (groupByKey == null) {
-                               return new 
TumblingPreReducer<OUT>((ReduceFunction<OUT>) transformation.UDF,
-                                               
getType().createSerializer(getExecutionConfig()));
-                       } else {
-                               return new 
TumblingGroupedPreReducer<OUT>((ReduceFunction<OUT>) transformation.UDF,
-                                               groupByKey, 
getType().createSerializer(getExecutionConfig()));
-                       }
-               }
-               return new BasicWindowBuffer<OUT>();
+       public DataStream<OUT> flatten() {
+               return dataStream;
        }
 
        /**
@@ -317,17 +263,16 @@ public class WindowedDataStream<OUT> {
         * reducing the current window at every trigger. In contrast with the
         * standard binary reducer, with reduceGroup the user can access all
         * elements of the window at the same time through the iterable 
interface.
-        * The user can also extend the {@link RichGroupReduceFunction} to gain
-        * access to other features provided by the
-        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        * The user can also extend the to gain access to other features 
provided by
+        * the {@link org.apache.flink.api.common.functions.RichFunction} 
interface.
         * 
-        * @param reduceFunction
+        * @param windowMapFunction
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> 
reduceFunction) {
-               return 
discretize(WindowTransformation.MAPWINDOW.with(reduceFunction),
-                               new 
BasicWindowBuffer<OUT>()).mapWindow(reduceFunction);
+       public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction) {
+               return 
discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
+                               new 
BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction);
        }
 
        /**
@@ -335,30 +280,80 @@ public class WindowedDataStream<OUT> {
         * reducing the current window at every trigger. In contrast with the
         * standard binary reducer, with reduceGroup the user can access all
         * elements of the window at the same time through the iterable 
interface.
-        * The user can also extend the {@link RichGroupReduceFunction} to gain
-        * access to other features provided by the
-        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        * The user can also extend the to gain access to other features 
provided by
+        * the {@link org.apache.flink.api.common.functions.RichFunction} 
interface.
         * </br> </br> This version of reduceGroup uses user supplied
         * typeinformation for serializaton. Use this only when the system is 
unable
-        * to detect type information using: {@link 
#mapWindow(GroupReduceFunction)}
+        * to detect type information using: {@link #mapWindow()}
         * 
-        * @param reduceFunction
+        * @param windowMapFunction
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> 
reduceFunction,
+       public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction,
                        TypeInformation<R> outType) {
 
-               return 
discretize(WindowTransformation.MAPWINDOW.with(reduceFunction),
-                               new 
BasicWindowBuffer<OUT>()).mapWindow(reduceFunction, outType);
+               return 
discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
+                               new 
BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction, outType);
        }
 
-       public DataStream<OUT> flatten() {
-               return dataStream;
+       private DiscretizedStream<OUT> discretize(WindowTransformation 
transformation,
+                       WindowBuffer<OUT> windowBuffer) {
+
+               StreamInvokable<OUT, StreamWindow<OUT>> discretizer = 
getDiscretizer(transformation,
+                               windowBuffer, getTrigger(), getEviction(), 
discretizerKey);
+
+               int parallelism = getDiscretizerParallelism();
+
+               return new DiscretizedStream<OUT>(dataStream.transform("Stream 
Discretizer",
+                               new StreamWindowTypeInfo<OUT>(getType()), 
discretizer).setParallelism(parallelism),
+                               groupByKey, transformation);
+
        }
 
-       protected Class<?> getClassAtPos(int pos) {
-               return dataStream.getClassAtPos(pos);
+       private int getDiscretizerParallelism() {
+               return isLocal || (discretizerKey != null) ? 
dataStream.environment
+                               .getDegreeOfParallelism() : 1;
+       }
+
+       private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer(
+                       WindowTransformation transformation, WindowBuffer<OUT> 
windowBuffer,
+                       TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> 
eviction,
+                       KeySelector<OUT, ?> discretizerKey) {
+
+               if (discretizerKey == null) {
+                       return new StreamDiscretizer<OUT>(trigger, eviction, 
windowBuffer);
+               } else if (trigger instanceof TimeTriggerPolicy
+                               && ((TimeTriggerPolicy<OUT>) 
trigger).timestampWrapper.isDefaultTimestamp()) {
+                       return new GroupedTimeDiscretizer<OUT>(discretizerKey,
+                                       (TimeTriggerPolicy<OUT>) trigger, 
(CloneableEvictionPolicy<OUT>) eviction,
+                                       windowBuffer);
+               } else {
+                       return new GroupedStreamDiscretizer<OUT>(discretizerKey,
+                                       (CloneableTriggerPolicy<OUT>) trigger, 
(CloneableEvictionPolicy<OUT>) eviction,
+                                       windowBuffer);
+               }
+
+       }
+
+       @SuppressWarnings("unchecked")
+       private WindowBuffer<OUT> getWindowBuffer(WindowTransformation 
transformation,
+                       TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> 
eviction,
+                       KeySelector<OUT, ?> discretizerKey) {
+
+               if (transformation == WindowTransformation.REDUCEWINDOW
+                               && eviction instanceof TumblingEvictionPolicy) {
+                       if (groupByKey == null) {
+                               return new TumblingPreReducer<OUT>(
+                                               
dataStream.clean((ReduceFunction<OUT>) transformation.UDF), getType()
+                                                               
.createSerializer(getExecutionConfig()));
+                       } else {
+                               return new TumblingGroupedPreReducer<OUT>(
+                                               
dataStream.clean((ReduceFunction<OUT>) transformation.UDF), groupByKey,
+                                               
getType().createSerializer(getExecutionConfig()));
+                       }
+               }
+               return new BasicWindowBuffer<OUT>();
        }
 
        /**
@@ -610,6 +605,10 @@ public class WindowedDataStream<OUT> {
 
        }
 
+       protected boolean isGrouped() {
+               return groupByKey != null;
+       }
+
        /**
         * Gets the output type.
         * 
@@ -619,21 +618,16 @@ public class WindowedDataStream<OUT> {
                return dataStream.getType();
        }
 
-       protected WindowedDataStream<OUT> copy() {
-               return new WindowedDataStream<OUT>(this);
-       }
-
-       protected boolean isGrouped() {
-               return groupByKey != null;
+       public ExecutionConfig getExecutionConfig() {
+               return dataStream.getExecutionConfig();
        }
 
-       public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
-               return discretize(WindowTransformation.NONE, new 
BasicWindowBuffer<OUT>())
-                               .getDiscretizedStream();
+       protected Class<?> getClassAtPos(int pos) {
+               return dataStream.getClassAtPos(pos);
        }
 
-       public ExecutionConfig getExecutionConfig() {
-               return dataStream.getExecutionConfig();
+       protected WindowedDataStream<OUT> copy() {
+               return new WindowedDataStream<OUT>(this);
        }
 
        protected static class WindowKey<R> implements 
KeySelector<StreamWindow<R>, Integer> {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
index 03160c2..9af1648 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.operators.CrossOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -33,16 +34,24 @@ import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 
 public class StreamCrossOperator<I1, I2> extends
                TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, 
I2>> {
-       
+
        public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> 
input2) {
                super(input1, input2);
        }
 
+       protected <F> F clean(F f) {
+               if 
(input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
+                       ClosureCleaner.clean(f, true);
+               }
+               ClosureCleaner.ensureSerializable(f);
+               return f;
+       }
+
        @Override
        protected CrossWindow<I1, I2> createNextWindowOperator() {
 
                CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction 
= new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
-                               input1.clean(new 
CrossOperator.DefaultCrossFunction<I1, I2>()));
+                               clean(new 
CrossOperator.DefaultCrossFunction<I1, I2>()));
 
                return new CrossWindow<I1, I2>(this, 
input1.connect(input2).addGeneralWindowCombine(
                                crossWindowFunction,

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 65dde79..8e09506 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 
 import com.esotericsoftware.kryo.Serializer;
+
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
new file mode 100644
index 0000000..ac2a19e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+
+public abstract class RichWindowMapFunction<IN, OUT> extends 
AbstractRichFunction implements
+               WindowMapFunction<IN, OUT> {
+
+       private static final long serialVersionUID = 9052714915997374185L;
+
+       @Override
+       public abstract void mapWindow(Iterable<IN> values, Collector<OUT> out) 
throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
new file mode 100644
index 0000000..273d731
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+public interface WindowMapFunction<T, O> extends Function, Serializable {
+
+       void mapWindow(Iterable<T> values, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
index 46d4fe9..041915f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
@@ -1,26 +1,22 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.util.Collector;
-
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
 public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
 
-       public void invoke(Collector<OUT> collector) throws Exception;
-               
-}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java
deleted file mode 100644
index 4c6e7cd..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/BasicWindowBuffer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.LinkedList;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.util.Collector;
-
-public class BasicWindowBuffer<T> implements WindowBuffer<T> {
-
-       private static final long serialVersionUID = 1L;
-       protected LinkedList<T> buffer;
-
-       public BasicWindowBuffer() {
-               this.buffer = new LinkedList<T>();
-       }
-
-       public boolean emitWindow(Collector<StreamWindow<T>> collector) {
-               if (!buffer.isEmpty()) {
-                       StreamWindow<T> currentWindow = new StreamWindow<T>();
-                       currentWindow.addAll(buffer);
-                       collector.collect(currentWindow);
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
-       public void store(T element) throws Exception {
-               buffer.add(element);
-       }
-
-       public void evict(int n) {
-               for (int i = 0; i < n; i++) {
-                       try {
-                               buffer.removeFirst();
-                       } catch (NoSuchElementException e) {
-                               // In case no more elements are in the buffer:
-                               // Prevent failure and stop deleting.
-                               break;
-                       }
-               }
-       }
-
-       public int size() {
-               return buffer.size();
-       }
-
-       @Override
-       public BasicWindowBuffer<T> clone() {
-               return new BasicWindowBuffer<T>();
-       }
-
-       @Override
-       public String toString() {
-               return buffer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java
deleted file mode 100644
index e071c2b..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/CompletePreAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-public interface CompletePreAggregator {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index ae6a2d9..58d7adb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -23,8 +23,10 @@ import java.util.Map;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
 public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, 
StreamWindow<IN>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
index 6d38ed9..5363c10 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.api.invokable.operator.windowing;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
 public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index 6b0bcec..a14058a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -18,11 +18,13 @@
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
 public class StreamDiscretizer<IN> extends StreamInvokable<IN, 
StreamWindow<IN>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
deleted file mode 100644
index 90c4e62..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
-
-       private static final long serialVersionUID = -5150196421193988403L;
-       private static Random rnd = new Random();
-
-       public int windowID;
-       public int transformationID;
-
-       public int numberOfParts;
-
-       public StreamWindow() {
-               this(rnd.nextInt(), rnd.nextInt(), 1);
-       }
-
-       public StreamWindow(int windowID) {
-               this(windowID, rnd.nextInt(), 1);
-       }
-
-       public StreamWindow(int windowID, int transformationID, int 
numberOfParts) {
-               super();
-               this.windowID = windowID;
-               this.transformationID = transformationID;
-               this.numberOfParts = numberOfParts;
-       }
-
-       public StreamWindow(StreamWindow<T> window) {
-               this(window.windowID, window.transformationID, 
window.numberOfParts);
-               addAll(window);
-       }
-
-       public StreamWindow(StreamWindow<T> window, TypeSerializer<T> 
serializer) {
-               this(window.windowID, window.transformationID, 
window.numberOfParts);
-               for (T element : window) {
-                       add(serializer.copy(element));
-               }
-       }
-
-       public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector) 
throws Exception {
-               Map<Object, StreamWindow<T>> partitions = new HashMap<Object, 
StreamWindow<T>>();
-
-               for (T value : this) {
-                       Object key = keySelector.getKey(value);
-                       StreamWindow<T> window = partitions.get(key);
-                       if (window == null) {
-                               window = new StreamWindow<T>(this.windowID, 
this.transformationID, 0);
-                               partitions.put(key, window);
-                       }
-                       window.add(value);
-               }
-
-               List<StreamWindow<T>> output = new ArrayList<StreamWindow<T>>();
-               int numkeys = partitions.size();
-
-               for (StreamWindow<T> window : partitions.values()) {
-                       output.add(window.setNumberOfParts(numkeys));
-               }
-
-               return output;
-       }
-
-       public List<StreamWindow<T>> split(int n) {
-               int numElements = size();
-               if (n > numElements) {
-                       return split(numElements);
-               } else {
-                       List<StreamWindow<T>> split = new 
ArrayList<StreamWindow<T>>();
-                       int splitSize = numElements / n;
-
-                       int index = -1;
-
-                       StreamWindow<T> currentSubWindow = new 
StreamWindow<T>(windowID, transformationID, n);
-                       split.add(currentSubWindow);
-
-                       for (T element : this) {
-                               index++;
-                               if (index == splitSize && split.size() < n) {
-                                       currentSubWindow = new 
StreamWindow<T>(windowID, transformationID, n);
-                                       split.add(currentSubWindow);
-                                       index = 0;
-                               }
-                               currentSubWindow.add(element);
-                       }
-                       return split;
-               }
-       }
-
-       public StreamWindow<T> setNumberOfParts(int n) {
-               this.numberOfParts = n;
-               return this;
-       }
-
-       public boolean compatibleWith(StreamWindow<T> otherWindow) {
-               return this.windowID == otherWindow.windowID && 
this.numberOfParts > 1;
-       }
-
-       public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) {
-               StreamWindow<R> window = new StreamWindow<R>(windows[0]);
-               for (int i = 1; i < windows.length; i++) {
-                       StreamWindow<R> next = windows[i];
-                       if (window.compatibleWith(next)) {
-                               window.addAll(next);
-                               window.numberOfParts--;
-                       } else {
-                               throw new RuntimeException("Can only merge 
compatible windows");
-                       }
-               }
-               return window;
-       }
-
-       public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows) {
-               if (windows.isEmpty()) {
-                       throw new RuntimeException("Need at least one window to 
merge");
-               } else {
-                       StreamWindow<R> window = new 
StreamWindow<R>(windows.get(0));
-                       for (int i = 1; i < windows.size(); i++) {
-                               StreamWindow<R> next = windows.get(i);
-                               if (window.compatibleWith(next)) {
-                                       window.addAll(next);
-                                       window.numberOfParts--;
-                               } else {
-                                       throw new RuntimeException("Can only 
merge compatible windows");
-                               }
-                       }
-                       return window;
-               }
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               return super.equals(o);
-       }
-
-       @Override
-       public void collect(T record) {
-               add(record);
-       }
-
-       @Override
-       public void close() {
-       }
-
-       @Override
-       public String toString() {
-               return super.toString();
-       }
-
-       public static <R> StreamWindow<R> fromElements(R... elements) {
-               StreamWindow<R> window = new StreamWindow<R>();
-               for (R element : elements) {
-                       window.add(element);
-               }
-               return window;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
deleted file mode 100755
index 002e440..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowSerializer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class StreamWindowSerializer<T> extends 
TypeSerializer<StreamWindow<T>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final TypeSerializer<T> typeSerializer;
-       TypeSerializer<Integer> intSerializer;
-
-       public StreamWindowSerializer(TypeInformation<T> typeInfo, 
ExecutionConfig conf) {
-               this.typeSerializer = typeInfo.createSerializer(conf);
-               this.intSerializer = 
BasicTypeInfo.INT_TYPE_INFO.createSerializer(conf);
-       }
-
-       public TypeSerializer<T> getObjectSerializer() {
-               return typeSerializer;
-       }
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public StreamWindow<T> createInstance() {
-               return new StreamWindow<T>(0, 0, 0);
-       }
-
-       @Override
-       public StreamWindow<T> copy(StreamWindow<T> from) {
-               return new StreamWindow<T>(from, typeSerializer);
-       }
-
-       @Override
-       public StreamWindow<T> copy(StreamWindow<T> from, StreamWindow<T> 
reuse) {
-               reuse.clear();
-               reuse.windowID = from.windowID;
-               reuse.transformationID = from.transformationID;
-               reuse.numberOfParts = from.numberOfParts;
-
-               for (T element : from) {
-                       reuse.add(typeSerializer.copy(element));
-               }
-               return reuse;
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(StreamWindow<T> value, DataOutputView target) 
throws IOException {
-
-               intSerializer.serialize(value.windowID, target);
-               intSerializer.serialize(value.transformationID, target);
-               intSerializer.serialize(value.numberOfParts, target);
-               intSerializer.serialize(value.size(), target);
-
-               for (T element : value) {
-                       typeSerializer.serialize(element, target);
-               }
-       }
-
-       @Override
-       public StreamWindow<T> deserialize(DataInputView source) throws 
IOException {
-               StreamWindow<T> window = createInstance();
-
-               window.windowID = intSerializer.deserialize(source);
-               window.transformationID = intSerializer.deserialize(source);
-               window.numberOfParts = intSerializer.deserialize(source);
-
-               int size = intSerializer.deserialize(source);
-
-               for (int i = 0; i < size; i++) {
-                       window.add(typeSerializer.deserialize(source));
-               }
-
-               return window;
-       }
-
-       @Override
-       public StreamWindow<T> deserialize(StreamWindow<T> reuse, DataInputView 
source)
-                       throws IOException {
-
-               StreamWindow<T> window = reuse;
-               window.clear();
-
-               window.windowID = source.readInt();
-               window.transformationID = source.readInt();
-               window.numberOfParts = source.readInt();
-
-               int size = source.readInt();
-
-               for (int i = 0; i < size; i++) {
-                       window.add(typeSerializer.deserialize(source));
-               }
-
-               return window;
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               // Needs to be implemented
-       }
-
-       @Override
-       public TypeSerializer<StreamWindow<T>> duplicate() {
-               return this;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
deleted file mode 100644
index 0054759..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindowTypeInfo.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
-
-       private static final long serialVersionUID = 1L;
-       TypeInformation<T> innerType;
-
-       public StreamWindowTypeInfo(TypeInformation<T> innerType) {
-               this.innerType = innerType;
-       }
-
-       public TypeInformation<T> getInnerType() {
-               return innerType;
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return innerType.isBasicType();
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return innerType.isTupleType();
-       }
-
-       @Override
-       public int getArity() {
-               return innerType.getArity();
-       }
-
-       @Override
-       public Class<StreamWindow<T>> getTypeClass() {
-               // TODO Auto-generated method stub
-               return null;
-       }
-
-       @Override
-       public boolean isKeyType() {
-               return innerType.isKeyType();
-       }
-
-       @Override
-       public TypeSerializer<StreamWindow<T>> createSerializer(ExecutionConfig 
conf) {
-               return new StreamWindowSerializer<T>(innerType, conf);
-       }
-
-       @Override
-       public int getTotalFields() {
-               return innerType.getTotalFields();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java
deleted file mode 100644
index f1d7ae4..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingGroupedPreReducer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, 
CompletePreAggregator {
-
-       private static final long serialVersionUID = 1L;
-
-       private ReduceFunction<T> reducer;
-       private KeySelector<T, ?> keySelector;
-
-       private Map<Object, T> reducedValues;
-
-       private int numOfElements = 0;
-       private TypeSerializer<T> serializer;
-
-       public TumblingGroupedPreReducer(ReduceFunction<T> reducer, 
KeySelector<T, ?> keySelector,
-                       TypeSerializer<T> serializer) {
-               this.reducer = reducer;
-               this.serializer = serializer;
-               this.keySelector = keySelector;
-               this.reducedValues = new HashMap<Object, T>();
-       }
-
-       public boolean emitWindow(Collector<StreamWindow<T>> collector) {
-
-               if (!reducedValues.isEmpty()) {
-                       StreamWindow<T> currentWindow = new StreamWindow<T>();
-                       currentWindow.addAll(reducedValues.values());
-                       collector.collect(currentWindow);
-                       reducedValues.clear();
-                       numOfElements = 0;
-                       return true;
-               } else {
-                       return false;
-               }
-
-       }
-
-       public void store(T element) throws Exception {
-               Object key = keySelector.getKey(element);
-
-               T reduced = reducedValues.get(key);
-
-               if (reduced == null) {
-                       reduced = element;
-               } else {
-                       reduced = reducer.reduce(serializer.copy(reduced), 
element);
-               }
-
-               reducedValues.put(key, reduced);
-               numOfElements++;
-       }
-
-       public void evict(int n) {
-       }
-
-       public int size() {
-               return numOfElements;
-       }
-
-       @Override
-       public TumblingGroupedPreReducer<T> clone() {
-               return new TumblingGroupedPreReducer<T>(reducer, keySelector, 
serializer);
-       }
-
-       @Override
-       public String toString() {
-               return reducedValues.toString();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java
deleted file mode 100644
index f1e531e..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/TumblingPreReducer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.Collector;
-
-public class TumblingPreReducer<T> implements WindowBuffer<T>, 
CompletePreAggregator {
-
-       private static final long serialVersionUID = 1L;
-
-       private ReduceFunction<T> reducer;
-
-       private T reduced;
-       private int numOfElements = 0;
-       private TypeSerializer<T> serializer;
-
-       public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> 
serializer) {
-               this.reducer = reducer;
-               this.serializer = serializer;
-       }
-
-       public boolean emitWindow(Collector<StreamWindow<T>> collector) {
-               if (reduced != null) {
-                       StreamWindow<T> currentWindow = new StreamWindow<T>();
-                       currentWindow.add(reduced);
-                       collector.collect(currentWindow);
-                       reduced = null;
-                       numOfElements = 0;
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
-       public void store(T element) throws Exception {
-               if (reduced == null) {
-                       reduced = element;
-               } else {
-                       reduced = reducer.reduce(serializer.copy(reduced), 
element);
-               }
-               numOfElements++;
-       }
-
-       public void evict(int n) {
-       }
-
-       public int size() {
-               return numOfElements;
-       }
-
-       @Override
-       public TumblingPreReducer<T> clone() {
-               return new TumblingPreReducer<T>(reducer, serializer);
-       }
-
-       @Override
-       public String toString() {
-               return reduced.toString();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java
deleted file mode 100644
index ef8fc2b..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBuffer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.io.Serializable;
-
-import org.apache.flink.util.Collector;
-
-public interface WindowBuffer<T> extends Serializable {
-
-       public void store(T element) throws Exception;
-
-       public void evict(int n);
-
-       public boolean emitWindow(Collector<StreamWindow<T>> collector);
-
-       public int size();
-       
-       public WindowBuffer<T> clone();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
index 5f5e7d2..7eaea58 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 
 public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> 
{
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
index 23aaf32..de93fab 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
@@ -17,29 +17,29 @@
 
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 
 public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, 
StreamWindow<OUT>> {
 
        private static final long serialVersionUID = 1L;
 
-       GroupReduceFunction<IN, OUT> reducer;
+       WindowMapFunction<IN, OUT> mapper;
 
-       public WindowMapper(GroupReduceFunction<IN, OUT> reducer) {
-               super(new WindowMapfunction<IN, OUT>(reducer));
-               this.reducer = reducer;
+       public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
+               super(new WindowMap<IN, OUT>(mapper));
+               this.mapper = mapper;
        }
 
-       private static class WindowMapfunction<T, R> implements
-                       MapFunction<StreamWindow<T>, StreamWindow<R>> {
+       private static class WindowMap<T, R> implements 
MapFunction<StreamWindow<T>, StreamWindow<R>> {
 
                private static final long serialVersionUID = 1L;
-               GroupReduceFunction<T, R> reducer;
+               WindowMapFunction<T, R> mapper;
 
-               public WindowMapfunction(GroupReduceFunction<T, R> reducer) {
-                       this.reducer = reducer;
+               public WindowMap(WindowMapFunction<T, R> mapper) {
+                       this.mapper = mapper;
                }
 
                @Override
@@ -47,7 +47,7 @@ public class WindowMapper<IN, OUT> extends 
MapInvokable<StreamWindow<IN>, Stream
                        StreamWindow<R> outputWindow = new 
StreamWindow<R>(window.windowID);
                        outputWindow.numberOfParts = window.numberOfParts;
 
-                       reducer.reduce(window, outputWindow);
+                       mapper.mapWindow(window, outputWindow);
 
                        return outputWindow;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
index 1766b0b..8601d06 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 
 public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, 
StreamWindow<T>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index e10692b..ea4451e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.api.invokable.operator.windowing;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 
 public class WindowPartitioner<T> extends
                ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
index b4f965f..3a4bb69 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.api.invokable.operator.windowing;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 
 public class WindowReducer<IN> extends MapInvokable<StreamWindow<IN>, 
StreamWindow<IN>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb5dc7e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
new file mode 100644
index 0000000..988058c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
+
+       private static final long serialVersionUID = -5150196421193988403L;
+       private static Random rnd = new Random();
+
+       public int windowID;
+
+       public int numberOfParts;
+
+       public StreamWindow() {
+               this(rnd.nextInt(), 1);
+       }
+
+       public StreamWindow(int windowID) {
+               this(windowID, 1);
+       }
+
+       public StreamWindow(int windowID, int numberOfParts) {
+               super();
+               this.windowID = windowID;
+               this.numberOfParts = numberOfParts;
+       }
+
+       public StreamWindow(StreamWindow<T> window) {
+               this(window.windowID, window.numberOfParts);
+               addAll(window);
+       }
+
+       public StreamWindow(StreamWindow<T> window, TypeSerializer<T> 
serializer) {
+               this(window.windowID, window.numberOfParts);
+               for (T element : window) {
+                       add(serializer.copy(element));
+               }
+       }
+
+       public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector) 
throws Exception {
+               Map<Object, StreamWindow<T>> partitions = new HashMap<Object, 
StreamWindow<T>>();
+
+               for (T value : this) {
+                       Object key = keySelector.getKey(value);
+                       StreamWindow<T> window = partitions.get(key);
+                       if (window == null) {
+                               window = new StreamWindow<T>(this.windowID, 0);
+                               partitions.put(key, window);
+                       }
+                       window.add(value);
+               }
+
+               List<StreamWindow<T>> output = new ArrayList<StreamWindow<T>>();
+               int numkeys = partitions.size();
+
+               for (StreamWindow<T> window : partitions.values()) {
+                       output.add(window.setNumberOfParts(numkeys));
+               }
+
+               return output;
+       }
+
+       public List<StreamWindow<T>> split(int n) {
+               int numElements = size();
+               if (n > numElements) {
+                       return split(numElements);
+               } else {
+                       List<StreamWindow<T>> split = new 
ArrayList<StreamWindow<T>>();
+                       int splitSize = numElements / n;
+
+                       int index = -1;
+
+                       StreamWindow<T> currentSubWindow = new 
StreamWindow<T>(windowID, n);
+                       split.add(currentSubWindow);
+
+                       for (T element : this) {
+                               index++;
+                               if (index == splitSize && split.size() < n) {
+                                       currentSubWindow = new 
StreamWindow<T>(windowID, n);
+                                       split.add(currentSubWindow);
+                                       index = 0;
+                               }
+                               currentSubWindow.add(element);
+                       }
+                       return split;
+               }
+       }
+
+       public StreamWindow<T> setNumberOfParts(int n) {
+               this.numberOfParts = n;
+               return this;
+       }
+
+       public boolean compatibleWith(StreamWindow<T> otherWindow) {
+               return this.windowID == otherWindow.windowID && 
this.numberOfParts > 1;
+       }
+
+       public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) {
+               StreamWindow<R> window = new StreamWindow<R>(windows[0]);
+               for (int i = 1; i < windows.length; i++) {
+                       StreamWindow<R> next = windows[i];
+                       if (window.compatibleWith(next)) {
+                               window.addAll(next);
+                               window.numberOfParts--;
+                       } else {
+                               throw new RuntimeException("Can only merge 
compatible windows");
+                       }
+               }
+               return window;
+       }
+
+       public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows) {
+               if (windows.isEmpty()) {
+                       throw new RuntimeException("Need at least one window to 
merge");
+               } else {
+                       StreamWindow<R> window = new 
StreamWindow<R>(windows.get(0));
+                       for (int i = 1; i < windows.size(); i++) {
+                               StreamWindow<R> next = windows.get(i);
+                               if (window.compatibleWith(next)) {
+                                       window.addAll(next);
+                                       window.numberOfParts--;
+                               } else {
+                                       throw new RuntimeException("Can only 
merge compatible windows");
+                               }
+                       }
+                       return window;
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               return super.equals(o);
+       }
+
+       @Override
+       public void collect(T record) {
+               add(record);
+       }
+
+       @Override
+       public void close() {
+       }
+
+       @Override
+       public String toString() {
+               return super.toString();
+       }
+
+       public static <R> StreamWindow<R> fromElements(R... elements) {
+               StreamWindow<R> window = new StreamWindow<R>();
+               for (R element : elements) {
+                       window.add(element);
+               }
+               return window;
+       }
+}

Reply via email to