[FLINK-2753] [streaming] [api breaking] Add first parts of new window API for 
key grouped windows

This follows the API design outlined in 
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

This is API breaking because it adds new generic type parameters to Java and 
Scala classes, breaking binary compatibility.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e20299c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e20299c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e20299c

Branch: refs/heads/master
Commit: 7e20299c4e2d9cc78c36f90bdf0acdbaf72062b0
Parents: 501a9b0
Author: Stephan Ewen <[email protected]>
Authored: Wed Sep 23 12:05:54 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Sep 28 17:04:16 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/TimeCharacteristic.java |  81 +++++++++++
 .../api/datastream/ConnectedDataStream.java     |   4 +-
 .../streaming/api/datastream/DataStream.java    |  26 ++--
 .../api/datastream/GroupedDataStream.java       |   8 +-
 .../api/datastream/KeyedDataStream.java         |  51 ++++++-
 .../api/datastream/KeyedWindowDataStream.java   | 135 +++++++++++++++++++
 .../api/datastream/WindowedDataStream.java      |   4 +-
 .../environment/StreamExecutionEnvironment.java |  47 ++++++-
 .../functions/windows/KeyedWindowFunction.java  |   6 +-
 .../windowpolicy/AbstractTimePolicy.java        | 109 +++++++++++++++
 .../api/windowing/windowpolicy/EventTime.java   |  64 +++++++++
 .../windowing/windowpolicy/ProcessingTime.java  |  65 +++++++++
 .../api/windowing/windowpolicy/Time.java        |  68 ++++++++++
 .../windowing/windowpolicy/WindowPolicy.java    |  57 ++++++++
 .../windows/AccumulatingKeyedTimePanes.java     |   8 +-
 ...ccumulatingProcessingTimeWindowOperator.java |   4 +-
 .../operators/windows/PolicyToOperator.java     |  82 +++++++++++
 .../streaming/util/keys/KeySelectorUtil.java    |  17 ++-
 .../api/state/StatefulOperatorTest.java         |   8 +-
 .../GroupedProcessingTimeWindowExample.java     |  79 +++--------
 .../flink/streaming/api/scala/DataStream.scala  |  17 +--
 .../streaming/api/scala/GroupedDataStream.scala |   3 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  25 +++-
 .../flink/streaming/api/scala/package.scala     |   4 +-
 24 files changed, 848 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
new file mode 100644
index 0000000..1ad3c99
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+/**
+ * The time characteristic defines how the system determines time for 
time-dependent
+ * order and operations that depend on time (such as time windows).
+ */
+public enum TimeCharacteristic {
+
+       /**
+        * Processing time for operators means that the operator uses the 
system clock of the machine
+        * to determine the current time of the data stream. Processing-time 
windows trigger based
+        * on wall-clock time and include whatever elements happen to have 
arrived at the operator at
+        * that point in time.
+        * <p>
+        * Using processing time for window operations results in general in 
quite non-deterministic results,
+        * because the contents of the windows depends on the speed in which 
elements arrive. It is, however,
+        * the cheapest method of forming windows and the method that 
introduces the least latency.
+        */
+       ProcessingTime,
+
+       /**
+        * Ingestion time means that the time of each individual element in the 
stream is determined
+        * when the element enters the Flink streaming data flow. Operations 
like windows group the
+        * elements based on that time, meaning that processing speed within 
the streaming dataflow
+        * does not affect windowing, but only the speed at which sources 
receive elements.
+        * <p>
+        * Ingestion time is often a good compromise between more processing 
time and event time.
+        * It does not need and special manual form of watermark generation, 
and events are typically
+        * not too much out-or-order when they arrive at operators; in fact, 
out-of-orderness can 
+        * only be introduced by streaming shuffles or split/join/union 
operations. The fact that elements
+        * are not very much out-of-order means that the latency increase is 
moderate, compared to event
+        * time.
+        */
+       IngestionTime,
+
+       /**
+        * Event time means that the time of each individual element in the 
stream (also called event)
+        * is determined by the event's individual custom timestamp. These 
timestamps either exist in the
+        * elements from before they entered the Flink streaming dataflow, or 
are user-assigned at the sources.
+        * The big implication of this is that elements arrive in the sources 
and in all operators generally
+        * out of order, meaning that elements with earlier timestamps may 
arrive after elements with
+        * later timestamps.
+        * <p>
+        * Operators that window or order data with respect to event time must 
buffer data until they can
+        * be sure that all timestamps for a certain time interval have been 
received. This is handled by
+        * the so called "time watermarks".
+        * <p>
+        * Operations based on event time are very predictable - the result of 
windowing operations
+        * is typically identical no matter when the window is executed and how 
fast the streams operate.
+        * At the same time, the buffering and tracking of event time is also 
costlier than operating
+        * with processing time, and typically also introduces more latency. 
The amount of extra
+        * cost depends mostly on how much out of order the elements arrive, 
i.e., how long the time span
+        * between the arrival of early and late elements is. With respect to 
the "time watermarks", this
+        * means that teh cost typically depends on how early or late the 
watermarks for can be generated
+        * for their timestamp.
+        * <p>
+        * In relation to {@link #IngestionTime}, the event time is similar, 
but refers the the event's
+        * original time, rather than the time assigned at the data source. 
Practically, that means that
+        * event time has generally more meaning, but also that it takes longer 
to determine that all
+        * elements for a certain time have arrived.
+        */
+       EventTime
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 8609a30..0406e35 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -67,8 +67,8 @@ public class ConnectedDataStream<IN1, IN2> {
 
                if ((input1 instanceof GroupedDataStream) && (input2 instanceof 
GroupedDataStream)) {
                        this.isGrouped = true;
-                       this.keySelector1 = ((GroupedDataStream<IN1>) 
input1).keySelector;
-                       this.keySelector2 = ((GroupedDataStream<IN2>) 
input2).keySelector;
+                       this.keySelector1 = ((GroupedDataStream<IN1, ?>) 
input1).keySelector;
+                       this.keySelector2 = ((GroupedDataStream<IN2, ?>) 
input2).keySelector;
                } else {
                        this.isGrouped = false;
                        this.keySelector1 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d92498c..5dfb1e2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -229,8 +229,8 @@ public class DataStream<T> {
         *            The KeySelector to be used for extracting the key for 
partitioning
         * @return The {@link DataStream} with partitioned state (i.e. 
KeyedDataStream)
         */
-       public KeyedDataStream<T> keyBy(KeySelector<T,?> key){
-               return new KeyedDataStream<T>(this, clean(key));
+       public <K> KeyedDataStream<T, K> keyBy(KeySelector<T, K> key){
+               return new KeyedDataStream<T, K>(this, clean(key));
        }
 
        /**
@@ -241,7 +241,7 @@ public class DataStream<T> {
         *            will be grouped.
         * @return The {@link DataStream} with partitioned state (i.e. 
KeyedDataStream)
         */
-       public KeyedDataStream<T> keyBy(int... fields) {
+       public KeyedDataStream<T, Tuple> keyBy(int... fields) {
                if (getType() instanceof BasicArrayTypeInfo || getType() 
instanceof PrimitiveArrayTypeInfo) {
                        return keyBy(new 
KeySelectorUtil.ArrayKeySelector<T>(fields));
                } else {
@@ -260,12 +260,12 @@ public class DataStream<T> {
         *            partitioned.
         * @return The {@link DataStream} with partitioned state (i.e. 
KeyedDataStream)
         **/
-       public KeyedDataStream<T> keyBy(String... fields) {
+       public KeyedDataStream<T, Tuple> keyBy(String... fields) {
                return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
        }
 
-       private KeyedDataStream<T> keyBy(Keys<T> keys) {
-               return new KeyedDataStream<T>(this, 
clean(KeySelectorUtil.getSelectorForKeys(keys,
+       private KeyedDataStream<T, Tuple> keyBy(Keys<T> keys) {
+               return new KeyedDataStream<T, Tuple>(this, 
clean(KeySelectorUtil.getSelectorForKeys(keys,
                                getType(), getExecutionConfig())));
        }
        
@@ -279,7 +279,7 @@ public class DataStream<T> {
         *            will be partitioned.
         * @return The {@link DataStream} with partitioned state (i.e. 
KeyedDataStream)
         */
-       public GroupedDataStream<T> groupBy(int... fields) {
+       public GroupedDataStream<T, Tuple> groupBy(int... fields) {
                if (getType() instanceof BasicArrayTypeInfo || getType() 
instanceof PrimitiveArrayTypeInfo) {
                        return groupBy(new 
KeySelectorUtil.ArrayKeySelector<T>(fields));
                } else {
@@ -304,7 +304,7 @@ public class DataStream<T> {
         *            grouped.
         * @return The grouped {@link DataStream}
         **/
-       public GroupedDataStream<T> groupBy(String... fields) {
+       public GroupedDataStream<T, Tuple> groupBy(String... fields) {
                return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
        }
 
@@ -322,13 +322,13 @@ public class DataStream<T> {
         *            the values
         * @return The grouped {@link DataStream}
         */
-       public GroupedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
-               return new GroupedDataStream<T>(this, clean(keySelector));
+       public <K> GroupedDataStream<T, K> groupBy(KeySelector<T, K> 
keySelector) {
+               return new GroupedDataStream<T, K>(this, clean(keySelector));
        }
 
-       private GroupedDataStream<T> groupBy(Keys<T> keys) {
-               return new GroupedDataStream<T>(this, 
clean(KeySelectorUtil.getSelectorForKeys(keys,
-                               getType(), getExecutionConfig())));
+       private GroupedDataStream<T, Tuple> groupBy(Keys<T> keys) {
+               return new GroupedDataStream<T, Tuple>(this, 
+                               clean(KeySelectorUtil.getSelectorForKeys(keys, 
getType(), getExecutionConfig())));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a1106bc..50bf341 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.streaming.api.operators.StreamGroupedReduce;
  * @param <OUT>
  *            The output type of the {@link GroupedDataStream}.
  */
-public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
+public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 
        /**
         * Creates a new {@link GroupedDataStream}, group inclusion is 
determined using
@@ -48,7 +48,7 @@ public class GroupedDataStream<OUT> extends 
KeyedDataStream<OUT> {
         * @param dataStream Base stream of data
         * @param keySelector Function for determining group inclusion
         */
-       public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, 
?> keySelector) {
+       public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, 
KEY> keySelector) {
                super(dataStream, keySelector);
        }
 
@@ -324,8 +324,6 @@ public class GroupedDataStream<OUT> extends 
KeyedDataStream<OUT> {
 
        protected SingleOutputStreamOperator<OUT, ?> 
aggregate(AggregationFunction<OUT> aggregate) {
                StreamGroupedReduce<OUT> operator = new 
StreamGroupedReduce<OUT>(clean(aggregate), keySelector);
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
transform("Grouped Aggregation",
-                               getType(), operator);
-               return returnStream;
+               return transform("Grouped Aggregation", getType(), operator);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index 100e5de..a32cf53 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
@@ -32,11 +33,12 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * are also possible on a KeyedDataStream, with the exception of partitioning 
methods such as shuffle, forward and groupBy.
  * 
  * 
- * @param <T> The type of the elements in the Keyed Stream
+ * @param <T> The type of the elements in the Keyed Stream.
+ * @param <K> The type of the key in the Keyed Stream.
  */
-public class KeyedDataStream<T> extends DataStream<T> {
+public class KeyedDataStream<T, K> extends DataStream<T> {
        
-       protected final KeySelector<T, ?> keySelector;
+       protected final KeySelector<T, K> keySelector;
 
        /**
         * Creates a new {@link KeyedDataStream} using the given {@link 
KeySelector}
@@ -47,35 +49,70 @@ public class KeyedDataStream<T> extends DataStream<T> {
         * @param keySelector
         *            Function for determining state partitions
         */
-       public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, ?> 
keySelector) {
+       public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> 
keySelector) {
                super(dataStream.getExecutionEnvironment(), new 
PartitionTransformation<T>(dataStream.getTransformation(), new 
HashPartitioner<T>(keySelector)));
                this.keySelector = keySelector;
        }
 
-       public KeySelector<T, ?> getKeySelector() {
+       
+       public KeySelector<T, K> getKeySelector() {
                return this.keySelector;
        }
 
+       
        @Override
        protected DataStream<T> setConnectionType(StreamPartitioner<T> 
partitioner) {
                throw new UnsupportedOperationException("Cannot override 
partitioning for KeyedDataStream.");
        }
 
+       
        @Override
        public <R> SingleOutputStreamOperator<R, ?> transform(String 
operatorName,
                        TypeInformation<R> outTypeInfo, 
OneInputStreamOperator<T, R> operator) {
 
                SingleOutputStreamOperator<R, ?> returnStream = 
super.transform(operatorName, outTypeInfo,operator);
 
-               ((OneInputTransformation<T, R>) 
returnStream.getTransformation()).setStateKeySelector(
-                               keySelector);
+               ((OneInputTransformation<T, R>) 
returnStream.getTransformation()).setStateKeySelector(keySelector);
                return returnStream;
        }
 
+       
+       
        @Override
        public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
                DataStreamSink<T> result = super.addSink(sinkFunction);
                result.getTransformation().setStateKeySelector(keySelector);
                return result;
        }
+       
+       // 
------------------------------------------------------------------------
+       //  Windowing
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Windows this data stream to a KeyedWindowDataStream, which evaluates 
windows over a key
+        * grouped stream. The window is defined by a single policy.
+        * <p>
+        * For time windows, these single-policy windows result in tumbling 
time windows.
+        *     
+        * @param policy The policy that defines the window.
+        * @return The windows data stream. 
+        */
+       public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
+               return new KeyedWindowDataStream<T, K>(this, policy);
+       }
+
+       /**
+        * Windows this data stream to a KeyedWindowDataStream, which evaluates 
windows over a key
+        * grouped stream. The window is defined by a window policy, plus a 
slide policy.
+        * <p>
+        * For time windows, these slide policy windows result in sliding time 
windows.
+        * 
+        * @param window The policy that defines the window.
+        * @param slide The additional policy defining the slide of the window. 
+        * @return The windows data stream.
+        */
+       public KeyedWindowDataStream<T, K> window(WindowPolicy window, 
WindowPolicy slide) {
+               return new KeyedWindowDataStream<T, K>(this, window, slide);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
new file mode 100644
index 0000000..2ec175a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator;
+
+/**
+ * A KeyedWindowDataStream represents a data stream where elements are grouped 
by key, and 
+ * for each key, the stream of elements is split into windows. The windows are 
conceptually
+ * evaluated for each key individually, meaning windows and trigger at 
different points
+ * for each key.
+ * <p>
+ * In many cases, however, the windows are "aligned", meaning they trigger at 
the
+ * same time for all keys. The most common example for that are the regular 
time windows.
+ * <p>
+ * Note that the KeyedWindowDataStream is purely and API construct, during 
runtime the
+ * KeyedWindowDataStream will be collapsed together with the KeyedDataStream 
and the operation
+ * over the window into one single operation.
+ * 
+ * @param <Type> The type of elements in the stream.
+ * @param <Key> The type of the key by which elements are grouped.
+ */
+public class KeyedWindowDataStream<Type, Key> {
+
+       /** The keyed data stream that is windowed by this stream */
+       private final KeyedDataStream<Type, Key> input;
+
+       /** The core window policy */
+       private final WindowPolicy windowPolicy;
+
+       /** The optional additional slide policy */
+       private final WindowPolicy slidePolicy;
+       
+       
+       public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, 
WindowPolicy windowPolicy) {
+               this(input, windowPolicy, null);
+       }
+
+       public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
+                                                               WindowPolicy 
windowPolicy, WindowPolicy slidePolicy) 
+       {
+               TimeCharacteristic time = 
input.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+               this.input = input;
+               this.windowPolicy = 
windowPolicy.makeSpecificBasedOnTimeCharacteristic(time);
+               this.slidePolicy = slidePolicy == null ? null : 
slidePolicy.makeSpecificBasedOnTimeCharacteristic(time);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Operations on the keyed windows
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Applies a reduce function to the window. The window function is 
called for each evaluation
+        * of the window for each key individually. The output of the reduce 
function is interpreted
+        * as a regular non-windowed stream.
+        * <p>
+        * This window will try and pre-aggregate data as much as the window 
policies permit. For example,
+        * tumbling time windows can perfectly pre-aggregate the data, meaning 
that only one element per
+        * key is stored. Sliding time windows will pre-aggregate on the 
granularity of the slide interval,
+        * so a few elements are stored per key (one per slide interval).
+        * Custom windows may not be able to pre-aggregate, or may need to 
store extra values in an
+        * aggregation tree.
+        * 
+        * @param function The reduce function.
+        * @return The data stream that is the result of applying the reduce 
function to the window. 
+        */
+       public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
+               String callLocation = Utils.getCallLocationName();
+               return createWindowOperator(function, input.getType(), "Reduce 
at " + callLocation);
+       }
+
+       /**
+        * Applies a window function to the window. The window function is 
called for each evaluation
+        * of the window for each key individually. The output of the window 
function is interpreted
+        * as a regular non-windowed stream.
+        * <p>
+        * Not that this function requires that all data in the windows is 
buffered until the window
+        * is evaluated, as the function provides no means od pre-aggregation.
+        * 
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, 
Result, Key> function) {
+               String callLocation = Utils.getCallLocationName();
+
+               TypeInformation<Type> inType = input.getType();
+               TypeInformation<Result> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               function, KeyedWindowFunction.class, true, 
true, inType, null, false);
+
+               return createWindowOperator(function, resultType, 
"KeyedWindowFunction at " + callLocation);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       private <Result> DataStream<Result> createWindowOperator(
+                       Function function, TypeInformation<Result> resultType, 
String functionName) {
+
+               String opName = windowPolicy.toString(slidePolicy) + " of " + 
functionName;
+               KeySelector<Type, Key> keySel = input.getKeySelector();
+               
+               OneInputStreamOperator<Type, Result> operator =
+                               
PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, 
keySel);
+               
+               return input.transform(opName, resultType, operator);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index bf3a11a..1226adf 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -103,7 +103,7 @@ public class WindowedDataStream<OUT> {
                this.triggerHelper = policyHelper;
 
                if (dataStream instanceof GroupedDataStream) {
-                       this.discretizerKey = ((GroupedDataStream<OUT>) 
dataStream).keySelector;
+                       this.discretizerKey = ((GroupedDataStream<OUT, ?>) 
dataStream).keySelector;
                }
        }
 
@@ -115,7 +115,7 @@ public class WindowedDataStream<OUT> {
                this.userEvicter = evicter;
 
                if (dataStream instanceof GroupedDataStream) {
-                       this.discretizerKey = ((GroupedDataStream<OUT>) 
dataStream).keySelector;
+                       this.discretizerKey = ((GroupedDataStream<OUT, ?>) 
dataStream).keySelector;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index d91afc9..a22a519 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.environment;
 
 import com.esotericsoftware.kryo.Serializer;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -49,6 +49,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
@@ -72,10 +73,12 @@ import org.apache.flink.util.SplittableIterator;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. 
An instance of it is
@@ -83,25 +86,33 @@ import java.util.List;
  */
 public abstract class StreamExecutionEnvironment {
 
-       public final static String DEFAULT_JOB_NAME = "Flink Streaming Job";
+       public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
 
        private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
-
+       
+       /** The time characteristic that is used if none other is set */
+       private static TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = 
TimeCharacteristic.ProcessingTime;
+       
+       // 
------------------------------------------------------------------------
+       
        private long bufferTimeout = 100;
 
-       private ExecutionConfig config = new ExecutionConfig();
+       private final ExecutionConfig config = new ExecutionConfig();
 
-       protected List<StreamTransformation<?>> transformations = 
Lists.newArrayList();
+       protected final List<StreamTransformation<?>> transformations = new 
ArrayList<>();
 
        protected boolean isChainingEnabled = true;
 
        protected long checkpointInterval = -1; // disabled
 
-       protected CheckpointingMode checkpointingMode = null;
+       protected CheckpointingMode checkpointingMode;
 
        protected boolean forceCheckpointing = false;
 
        protected StateHandleProvider<?> stateHandleProvider;
+       
+       /** The time characteristic used by the data streams */
+       private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
 
        /** The environment of the context (local by default, cluster if 
invoked through command line) */
        private static StreamExecutionEnvironmentFactory 
contextEnvironmentFactory;
@@ -516,6 +527,30 @@ public abstract class StreamExecutionEnvironment {
        }
 
        // 
--------------------------------------------------------------------------------------------
+       //  Time characteristic
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Sets the time characteristic for the stream, e.g., processing time, 
event time,
+        * or ingestion time.
+        * 
+        * @param characteristic The time characteristic.
+        */
+       public void setStreamTimeCharacteristic(TimeCharacteristic 
characteristic) {
+               this.timeCharacteristic = 
Objects.requireNonNull(characteristic);
+       }
+
+       /**
+        * Gets the time characteristic for the stream, e.g., processing time, 
event time,
+        * or ingestion time.
+        * 
+        * @return The time characteristic.
+        */
+       public TimeCharacteristic getStreamTimeCharacteristic() {
+               return timeCharacteristic;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
        // Data stream creations
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
index d7ca0a1..b4e55e4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
@@ -25,12 +25,12 @@ import java.io.Serializable;
 
 /**
  * Base interface for functions that are evaluated over keyed (grouped) 
windows.
- * 
- * @param <KEY> The type of the key.
+ *
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
  */
-public interface KeyedWindowFunction<KEY, IN, OUT> extends Function, 
Serializable {
+public interface KeyedWindowFunction<IN, OUT, KEY> extends Function, 
Serializable {
 
        /**
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
new file mode 100644
index 0000000..9dc0dd0
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class AbstractTimePolicy extends WindowPolicy {
+
+       private static final long serialVersionUID = 6593098375698927728L;
+       
+       /** the time unit for this policy's time interval */
+       private final TimeUnit unit;
+       
+       /** the length of this policy's time interval */
+       private final long num;
+
+
+       protected AbstractTimePolicy(long num, TimeUnit unit) {
+               this.unit = checkNotNull(unit, "time unit may not be null");
+               this.num = num;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the time unit for this policy's time interval.
+        * @return The time unit for this policy's time interval.
+        */
+       public TimeUnit getUnit() {
+               return unit;
+       }
+
+       /**
+        * Gets the length of this policy's time interval.
+        * @return The length of this policy's time interval.
+        */
+       public long getNum() {
+               return num;
+       }
+
+       /**
+        * Converts the time interval to milliseconds.
+        * @return The time interval in milliseconds.
+        */
+       public long toMilliseconds() {
+               return unit.toMillis(num);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString(WindowPolicy slidePolicy) {
+               if (slidePolicy == null) {
+                       return "Tumbling Window (" + getClass().getSimpleName() 
+ ") (" + num + ' ' + unit.name() + ')';
+               }
+               else if (slidePolicy.getClass() == getClass()) {
+                       AbstractTimePolicy timeSlide = (AbstractTimePolicy) 
slidePolicy;
+                       
+                       return "Sliding Window (" + getClass().getSimpleName() 
+ ") (length="
+                                       + num + ' ' + unit.name() + ", slide=" 
+ timeSlide.num + ' ' + timeSlide.unit.name() + ')';
+               }
+               else {
+                       return super.toString(slidePolicy);
+               }
+       }
+       
+       @Override
+       public int hashCode() {
+               return 31 * (int) (num ^ (num >>> 32)) + unit.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj != null && obj.getClass() == getClass()) {
+                       AbstractTimePolicy that = (AbstractTimePolicy) obj;
+                       return this.num == that.num && 
this.unit.equals(that.unit);
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getSimpleName() + " (" + num + ' ' + 
unit.name() + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
new file mode 100644
index 0000000..8a671fc
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of an event time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a 
definition
+ * of event time.
+ */
+public final class EventTime extends AbstractTimePolicy {
+
+       private static final long serialVersionUID = 8333566691833596747L;
+
+       /** Instantiation only via factory method */
+       private EventTime(long num, TimeUnit unit) {
+               super(num, unit);
+       }
+
+       @Override
+       public EventTime 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+               if (characteristic == TimeCharacteristic.EventTime || 
characteristic == TimeCharacteristic.IngestionTime) {
+                       return this;
+               }
+               else {
+                       throw new InvalidProgramException(
+                                       "Cannot use EventTime policy in a 
dataflow that runs on " + characteristic);
+               }
+       }
+       // 
------------------------------------------------------------------------
+       //  Factory
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates an event time policy describing an event time interval.
+        *
+        * @param num The length of the time interval.
+        * @param unit The init (seconds, milliseconds) of the time interval.
+        * @return The event time policy.
+        */
+       public static EventTime of(long num, TimeUnit unit) {
+               return new EventTime(num, unit);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
new file mode 100644
index 0000000..2ff13fa
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a processing time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} 
for a definition
+ * of processing time.
+ */
+public final class ProcessingTime extends AbstractTimePolicy {
+
+       private static final long serialVersionUID = 7546166721132583007L;
+
+       /** Instantiation only via factory method */
+       private ProcessingTime(long num, TimeUnit unit) {
+               super(num, unit);
+       }
+
+       @Override
+       public ProcessingTime 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+               if (characteristic == TimeCharacteristic.ProcessingTime) {
+                       return this;
+               }
+               else {
+                       throw new InvalidProgramException(
+                                       "Cannot use ProcessingTime policy in a 
dataflow that runs on " + characteristic);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Factory
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a processing time policy describing a processing time 
interval.
+        * 
+        * @param num The length of the time interval.
+        * @param unit The init (seconds, milliseconds) of the time interval.
+        * @return The processing time policy.
+        */
+       public static ProcessingTime of(long num, TimeUnit unit) {
+               return new ProcessingTime(num, unit);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
new file mode 100644
index 0000000..0233e96
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a time interval for windowing. The time characteristic 
referred
+ * to is the default time characteristic set on the execution environment.
+ */
+public final class Time extends AbstractTimePolicy {
+
+       private static final long serialVersionUID = 3197290738634320211L;
+
+       /** Instantiation only via factory method */
+       private Time(long num, TimeUnit unit) {
+               super(num, unit);
+       }
+
+       @Override
+       public AbstractTimePolicy 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+               switch (timeCharacteristic) {
+                       case ProcessingTime:
+                               return ProcessingTime.of(getNum(), getUnit());
+                       case IngestionTime:
+                       case EventTime:
+                               return EventTime.of(getNum(), getUnit());
+                       default:
+                               throw new IllegalArgumentException("Unknown 
time characteristic");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Factory
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a time policy describing a processing time interval. The 
policy refers to the
+        * time characteristic that is set on the dataflow via
+        * {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
+        * 
setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
+        *
+        * @param num The length of the time interval.
+        * @param unit The init (seconds, milliseconds) of the time interval.
+        * @return The time policy.
+        */
+       public static Time of(long num, TimeUnit unit) {
+               return new Time(num, unit);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
new file mode 100644
index 0000000..a82f892
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+/**
+ * The base class of all window policies. Window policies define how windows
+ * are formed over the data stream.
+ */
+public abstract class WindowPolicy implements java.io.Serializable {
+
+       private static final long serialVersionUID = -8696529489282723113L;
+       
+       /**
+        * If this window policies concrete instantiation depends on the time 
characteristic of the
+        * dataflow (processing time, event time), then this method must be 
overridden to convert this
+        * policy to the respective specific instantiation.
+        * <p>
+        * The {@link Time} policy for example, will convert itself to an 
{@link ProcessingTime} policy,
+        * if the time characteristic is set to {@link 
TimeCharacteristic#ProcessingTime}.
+        * <p>
+        * By default, this method does nothing and simply returns this object 
itself.
+        * 
+        * @param characteristic The time characteristic of the dataflow.
+        * @return The specific instantiation of this policy, or the policy 
itself. 
+        */
+       public WindowPolicy 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+               return this;
+       }
+       
+       
+       public String toString(WindowPolicy slidePolicy) {
+               if (slidePolicy != null) {
+                       return "Window [" + toString() + ", slide=" + 
slidePolicy + ']';
+               }
+               else {
+                       return "Window [" + toString() + ']';
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
index e776106..1212123 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
@@ -32,13 +32,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
 
-       private final KeyedWindowFunction<Key, Type, Result> function;
+       private final KeyedWindowFunction<Type, Result, Key> function;
        
        private long evaluationPass;
 
        // 
------------------------------------------------------------------------
        
-       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
KeyedWindowFunction<Key, Type, Result> function) {
+       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
KeyedWindowFunction<Type, Result, Key> function) {
                this.keySelector = keySelector;
                this.function = function;
        }
@@ -75,7 +75,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        
        static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-               private final KeyedWindowFunction<Key, Type, Result> function;
+               private final KeyedWindowFunction<Type, Result, Key> function;
                
                private final UnionIterator<Type> unionIterator;
                
@@ -83,7 +83,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                
                private Key currentKey;
 
-               WindowFunctionTraversal(KeyedWindowFunction<Key, Type, Result> 
function, Collector<Result> out) {
+               WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> 
function, Collector<Result> out) {
                        this.function = function;
                        this.out = out;
                        this.unionIterator = new UnionIterator<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
index 16444c2..fb9d163 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
@@ -30,7 +30,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
 
        
        public AccumulatingProcessingTimeWindowOperator(
-                       KeyedWindowFunction<KEY, IN, OUT> function,
+                       KeyedWindowFunction<IN, OUT, KEY> function,
                        KeySelector<IN, KEY> keySelector,
                        long windowLength,
                        long windowSlide)
@@ -41,7 +41,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
        @Override
        protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
                @SuppressWarnings("unchecked")
-               KeyedWindowFunction<KEY, IN, OUT> windowFunction = 
(KeyedWindowFunction<KEY, IN, OUT>) function;
+               KeyedWindowFunction<IN, OUT, KEY> windowFunction = 
(KeyedWindowFunction<IN, OUT, KEY>) function;
                
                return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
new file mode 100644
index 0000000..9d06ef5
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+
+/**
+ * This class implements the conversion from window policies to concrete 
operator
+ * implementations.
+ */
+public class PolicyToOperator {
+
+       /**
+        * Entry point to create an operator for the given window policies and 
the window function.
+        */
+       public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> 
createOperatorForPolicies(
+                       WindowPolicy window, WindowPolicy slide, Function 
function, KeySelector<IN, KEY> keySelector)
+       {
+               if (window == null || function == null) {
+                       throw new NullPointerException();
+               }
+               
+               // -- case 1: both policies are processing time policies
+               if (window instanceof ProcessingTime && (slide == null || slide 
instanceof ProcessingTime)) {
+                       final long windowLength = ((ProcessingTime) 
window).toMilliseconds();
+                       final long windowSlide = slide == null ? windowLength : 
((ProcessingTime) slide).toMilliseconds();
+                       
+                       if (function instanceof ReduceFunction) {
+                               @SuppressWarnings("unchecked")
+                               ReduceFunction<IN> reducer = 
(ReduceFunction<IN>) function;
+
+                               @SuppressWarnings("unchecked")
+                               OneInputStreamOperator<IN, OUT> op = 
(OneInputStreamOperator<IN, OUT>)
+                                               new 
AggregatingProcessingTimeWindowOperator<KEY, IN>(
+                                                               reducer, 
keySelector, windowLength, windowSlide);
+                               return op;
+                       }
+                       else if (function instanceof KeyedWindowFunction) {
+                               @SuppressWarnings("unchecked")
+                               KeyedWindowFunction<IN, OUT, KEY> wf = 
(KeyedWindowFunction<IN, OUT, KEY>) function;
+
+                               return new 
AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+                                                               wf, 
keySelector, windowLength, windowSlide);
+                       }
+               }
+
+               // -- case 2: both policies are event time policies
+               if (window instanceof EventTime && (slide == null || slide 
instanceof EventTime)) {
+                       // add event time implementation
+               }
+               
+               throw new UnsupportedOperationException("The windowing 
mechanism does not yet support " + window.toString(slide));
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       /** Don't instantiate */
+       private PolicyToOperator() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 2e0fe66..f758147 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 public class KeySelectorUtil {
 
-       public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, 
TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
+       public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> 
keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
                if (!(typeInfo instanceof CompositeType)) {
                        throw new InvalidTypesException(
                                        "This key operation requires a 
composite type such as Tuples, POJOs, or Case Classes.");
@@ -93,9 +93,15 @@ public class KeySelectorUtil {
                        comparator.extractKeys(value, keyArray, 0);
                        return (K) keyArray[0];
                }
-
        }
 
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * A key selector for selecting key fields via a TypeComparator.
+        *
+        * @param <IN> The type from which the key is extracted.
+        */
        public static class ComparableKeySelector<IN> implements 
KeySelector<IN, Tuple> {
 
                private static final long serialVersionUID = 1L;
@@ -126,6 +132,13 @@ public class KeySelectorUtil {
 
        }
 
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * A key selector for selecting individual array fields as keys and 
returns them as a Tuple.
+        * 
+        * @param <IN> The type from which the key is extracted, i.e., the 
array type.
+        */
        public static final class ArrayKeySelector<IN> implements 
KeySelector<IN, Tuple> {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index b8b4c13..207b1b1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -113,7 +113,9 @@ public class StatefulOperatorTest extends 
StreamingMultipleProgramsTestBase {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(3);
 
-               KeyedDataStream<Integer> keyedStream = 
env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
+               KeyedDataStream<Integer, Integer> keyedStream = env
+                               .fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 
6))
+                               .keyBy(new ModKey(4));
 
                keyedStream.map(new StatefulMapper()).addSink(new 
SinkFunction<String>() {
                        private static final long serialVersionUID = 1L;
@@ -163,7 +165,7 @@ public class StatefulOperatorTest extends 
StreamingMultipleProgramsTestBase {
 
        @SuppressWarnings("unchecked")
        private StreamMap<Integer, String> 
createOperatorWithContext(List<String> output,
-                       KeySelector<Integer, Serializable> partitioner, byte[] 
serializedState) throws Exception {
+                       KeySelector<Integer, ? extends Serializable> 
partitioner, byte[] serializedState) throws Exception {
                final List<String> outputList = output;
 
                StreamingRuntimeContext context = new StreamingRuntimeContext(
@@ -355,7 +357,7 @@ public class StatefulOperatorTest extends 
StreamingMultipleProgramsTestBase {
 
        }
 
-       public static class ModKey implements KeySelector<Integer, 
Serializable> {
+       public static class ModKey implements KeySelector<Integer, Integer> {
 
                private static final long serialVersionUID = 
4193026742083046736L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 7387a1e..e52c2cb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -22,15 +22,16 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import 
org.apache.flink.streaming.runtime.operators.windows.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
 import org.apache.flink.util.Collector;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 @SuppressWarnings("serial")
 public class GroupedProcessingTimeWindowExample {
        
@@ -75,31 +76,20 @@ public class GroupedProcessingTimeWindowExample {
                                });
                
                stream
-                               .groupBy(new 
FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
-//                             .window(Time.of(2500, 
TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS))
-//                             .reduceWindow(new SummingReducer())
-//                             .flatten()
-//             .partitionByHash(new FirstFieldKeyExtractor<Tuple2<Long, Long>, 
Long>())
-//             .transform(
-//                             "Aligned time window",
-//                             TypeInfoParser.<Tuple2<Long, 
Long>>parse("Tuple2<Long, Long>"),
-//                             new 
AccumulatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>, Tuple2<Long, 
Long>>(
-//                                             new 
SummingWindowFunction<Long>(),
-//                                             new 
FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
-//                                             2500, 500))
-                       .transform(
-                               "Aligned time window",
-                               TypeInfoParser.<Tuple2<Long, 
Long>>parse("Tuple2<Long, Long>"),
-                               new 
AggregatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>>(
-                                               new SummingReducer(),
-                                               new 
FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
-                                               2500, 500))
+                       .keyBy(0)
+                       .window(Time.of(2500, MILLISECONDS), Time.of(500, 
MILLISECONDS))
+                       .reduceWindow(new SummingReducer())
+
+                       // alternative: use a mapWindow function which does not 
pre-aggregate
+//                     .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, 
Long>())
+//                     .window(Time.of(2500, MILLISECONDS), Time.of(500, 
MILLISECONDS))
+//                     .mapWindow(new SummingWindowFunction())
                                
                        .addSink(new SinkFunction<Tuple2<Long, Long>>() {
-                                       @Override
-                                       public void invoke(Tuple2<Long, Long> 
value) {
-                       }
-               });
+                               @Override
+                               public void invoke(Tuple2<Long, Long> value) {
+                               }
+                       });
                
                env.execute();
        }
@@ -113,47 +103,16 @@ public class GroupedProcessingTimeWindowExample {
                }
        }
 
-       public static class IdentityKeyExtractor<T> implements KeySelector<T, 
T> {
-
-               @Override
-               public T getKey(T value) {
-                       return value;
-               }
-       }
-
-       public static class IdentityWindowFunction<K, T> implements 
KeyedWindowFunction<K, T, T> {
-
-               @Override
-               public void evaluate(K k, Iterable<T> values, Collector<T> out) 
throws Exception {
-                       for (T v : values) {
-                               out.collect(v);
-                       }
-               }
-       }
-       
-       public static class CountingWindowFunction<K, T> implements 
KeyedWindowFunction<K, T, Long> {
-               
-               @Override
-               public void evaluate(K k, Iterable<T> values, Collector<Long> 
out) throws Exception {
-                       long count = 0;
-                       for (T ignored : values) {
-                               count++;
-                       }
-
-                       out.collect(count);
-               }
-       }
-
-       public static class SummingWindowFunction<K> implements 
KeyedWindowFunction<K, Tuple2<K, Long>, Tuple2<K, Long>> {
+       public static class SummingWindowFunction implements 
KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
 
                @Override
-               public void evaluate(K key, Iterable<Tuple2<K, Long>> values, 
Collector<Tuple2<K, Long>> out) throws Exception {
+               public void evaluate(Long key, Iterable<Tuple2<Long, Long>> 
values, Collector<Tuple2<Long, Long>> out) {
                        long sum = 0L;
-                       for (Tuple2<K, Long> value : values) {
+                       for (Tuple2<Long, Long> value : values) {
                                sum += value.f1;
                        }
 
-                       out.collect(new Tuple2<K, Long>(key, sum));
+                       out.collect(new Tuple2<>(key, sum));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 2bb6a6a..2f4bd23 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.api.scala
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, 
MapFunction,
-  Partitioner, FoldFunction, FilterFunction}
+import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, 
Partitioner, FilterFunction}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
@@ -30,17 +30,12 @@ import 
org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, 
DataStreamSink, SingleOutputStreamOperator}
-import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, 
StreamReduce}
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, 
TriggerPolicy}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.OperatorState
 import org.apache.flink.api.common.functions.{RichMapFunction, 
RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.KeyedDataStream
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 
@@ -244,20 +239,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Groups the elements of a DataStream by the given key positions (for 
tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
    */
-  def groupBy(fields: Int*): GroupedDataStream[T] = javaStream.groupBy(fields: 
_*)
+  def groupBy(fields: Int*): GroupedDataStream[T, JavaTuple] = 
javaStream.groupBy(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
    */
-  def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T] 
= 
+  def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, 
JavaTuple] = 
    javaStream.groupBy(firstField +: otherFields.toArray: _*)   
   
   /**
    * Groups the elements of a DataStream by the given K key to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
    */
-  def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T] = {
+  def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T, K] = {
 
     val cleanFun = clean(fun)
     val keyExtractor = new KeySelector[T, K] {
@@ -605,7 +600,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   private[flink] def isStatePartitioned: Boolean = {
-    javaStream.isInstanceOf[KeyedDataStream[T]]
+    javaStream.isInstanceOf[KeyedDataStream[_, _]]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
index 34f0807..e1a963d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
@@ -29,7 +29,8 @@ import org.apache.flink.api.common.functions.FoldFunction
 import org.apache.flink.api.common.functions.ReduceFunction
 
 
-class GroupedDataStream[T](javaStream: GroupedJavaStream[T]) extends 
DataStream[T](javaStream){
+class GroupedDataStream[T, K](javaStream: GroupedJavaStream[T, K]) 
+  extends DataStream[T](javaStream) {
  
   /**
    * Creates a new [[DataStream]] by reducing the elements of this DataStream

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 5e02ec5..9d62bcb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,13 +18,15 @@
 
 package org.apache.flink.streaming.api.scala
 
+import java.util.Objects
+
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.StateHandleProvider
-import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaEnv}
 import 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -294,6 +296,27 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   // 
--------------------------------------------------------------------------------------------
+  //  Time characteristic
+  // 
--------------------------------------------------------------------------------------------
+  /**
+   * Sets the time characteristic for the stream, e.g., processing time, event 
time,
+   * or ingestion time.
+   *
+   * @param characteristic The time characteristic.
+   */
+  def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = 
{
+    javaEnv.setStreamTimeCharacteristic(characteristic)
+  }
+
+  /**
+   * Gets the time characteristic for the stream, e.g., processing time, event 
time,
+   * or ingestion time.
+   *
+   * @return The time characteristic.
+   */
+  def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
+
+  // 
--------------------------------------------------------------------------------------------
   // Data stream creations
   // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 2eb4f9e..59843e2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -38,8 +38,8 @@ package object scala {
   implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
     new DataStream[R](javaStream)
     
-  implicit def javaToScalaGroupedStream[R](javaStream: GroupedJavaStream[R]): 
-  GroupedDataStream[R] = new GroupedDataStream[R](javaStream)    
+  implicit def javaToScalaGroupedStream[R, K](javaStream: GroupedJavaStream[R, 
K]): 
+  GroupedDataStream[R, K] = new GroupedDataStream[R, K](javaStream)    
 
   implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): 
WindowedDataStream[R] =
     new WindowedDataStream[R](javaWStream)

Reply via email to