[FLINK-3521] Make Iterable part of method signature for WindowFunction

This closes #1723


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba069f35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba069f35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba069f35

Branch: refs/heads/release-1.0
Commit: ba069f35b21de5d2a30ba0cd2234f20f35532c09
Parents: 603f351
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Feb 26 15:19:50 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Sat Feb 27 00:10:52 2016 +0100

----------------------------------------------------------------------
 .../ml/IncrementalLearningSkeleton.java         |  2 +-
 .../GroupedProcessingTimeWindowExample.java     |  2 +-
 .../api/datastream/AllWindowedStream.java       |  8 +--
 .../api/datastream/CoGroupedStreams.java        |  2 +-
 .../api/datastream/WindowedStream.java          | 39 +++++------
 .../functions/windowing/AllWindowFunction.java  |  5 +-
 .../windowing/FoldApplyAllWindowFunction.java   |  5 +-
 .../windowing/FoldApplyWindowFunction.java      |  5 +-
 .../windowing/PassThroughAllWindowFunction.java |  6 +-
 .../windowing/PassThroughWindowFunction.java    |  6 +-
 .../windowing/ReduceApplyAllWindowFunction.java |  6 +-
 .../windowing/ReduceApplyWindowFunction.java    |  6 +-
 .../ReduceIterableAllWindowFunction.java        |  2 +-
 .../windowing/ReduceIterableWindowFunction.java |  2 +-
 .../api/functions/windowing/WindowFunction.java |  3 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |  8 +--
 ...ccumulatingProcessingTimeWindowOperator.java |  6 +-
 .../EvictingNonKeyedWindowOperator.java         |  2 +-
 .../windowing/EvictingWindowOperator.java       |  6 +-
 .../windowing/NonKeyedWindowOperator.java       |  4 +-
 .../operators/windowing/WindowOperator.java     | 12 ++--
 .../InternalIterableWindowFunction.java         | 72 +++++++++++++++++++
 .../InternalSingleValueWindowFunction.java      | 74 ++++++++++++++++++++
 .../functions/InternalWindowFunction.java       | 47 +++++++++++++
 .../flink/streaming/api/DataStreamTest.java     |  2 +-
 .../operators/FoldApplyWindowFunctionTest.java  |  6 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 10 +--
 .../windowing/AllWindowTranslationTest.java     |  6 +-
 .../windowing/EvictingWindowOperatorTest.java   |  7 +-
 .../windowing/TimeWindowTranslationTest.java    |  6 +-
 .../operators/windowing/WindowOperatorTest.java | 16 +++--
 .../windowing/WindowTranslationTest.java        |  7 +-
 .../streaming/api/scala/AllWindowedStream.scala | 43 ++++++++----
 .../streaming/api/scala/WindowedStream.scala    | 47 +++++++++----
 .../api/scala/function/AllWindowFunction.scala  | 45 ++++++++++++
 .../api/scala/function/WindowFunction.scala     | 47 +++++++++++++
 .../api/scala/AllWindowTranslationTest.scala    | 12 ++--
 .../api/scala/WindowTranslationTest.scala       | 12 ++--
 .../EventTimeAllWindowCheckpointingITCase.java  | 22 ++++--
 .../EventTimeWindowCheckpointingITCase.java     | 24 +++++--
 .../WindowCheckpointingITCase.java              |  4 +-
 41 files changed, 498 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index acbc5d6..4108485 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -167,7 +167,7 @@ public class IncrementalLearningSkeleton {
        /**
         * Builds up-to-date partial models on new training data.
         */
-       public static class PartialModelBuilder implements 
AllWindowFunction<Iterable<Integer>, Double[], TimeWindow> {
+       public static class PartialModelBuilder implements 
AllWindowFunction<Integer, Double[], TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                protected Double[] buildPartialModel(Iterable<Integer> values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 196b73e..f08069b 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -104,7 +104,7 @@ public class GroupedProcessingTimeWindowExample {
                }
        }
 
-       public static class SummingWindowFunction implements 
WindowFunction<Iterable<Tuple2<Long, Long>>, Tuple2<Long, Long>, Long, Window> {
+       public static class SummingWindowFunction implements 
WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
 
                @Override
                public void apply(Long key, Window window, 
Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 56640d3..6b32880 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -222,11 +221,10 @@ public class AllWindowedStream<T, W extends Window> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> 
apply(AllWindowFunction<Iterable<T>, R, W> function) {
+       public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, 
R, W> function) {
                @SuppressWarnings("unchecked, rawtypes")
-               TypeInformation<Iterable<T>> iterTypeInfo = new 
GenericTypeInfo<>((Class) Iterable.class);
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, AllWindowFunction.class, true, true, 
iterTypeInfo, null, false);
+                               function, AllWindowFunction.class, true, true, 
getInputType(), null, false);
 
                return apply(function, resultType);
        }
@@ -242,7 +240,7 @@ public class AllWindowedStream<T, W extends Window> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> 
apply(AllWindowFunction<Iterable<T>, R, W> function, TypeInformation<R> 
resultType) {
+       public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, 
R, W> function, TypeInformation<R> resultType) {
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index e921940..713433c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -556,7 +556,7 @@ public class CoGroupedStreams<T1, T2> {
 
        private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends 
Window>
                        extends WrappingFunction<CoGroupFunction<T1, T2, T>>
-                       implements WindowFunction<Iterable<TaggedUnion<T1, 
T2>>, T, KEY, W> {
+                       implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, 
W> {
                
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 2ced99d..5c92fe0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -52,6 +51,8 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -218,11 +219,9 @@ public class WindowedStream<T, K, W extends Window> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> 
apply(WindowFunction<Iterable<T>, R, K, W> function) {
-               @SuppressWarnings("unchecked, rawtypes")
-               TypeInformation<Iterable<T>> iterTypeInfo = new 
GenericTypeInfo<>((Class) Iterable.class);
+       public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, 
K, W> function) {
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, WindowFunction.class, true, true, 
iterTypeInfo, null, false);
+                               function, WindowFunction.class, true, true, 
getInputType(), null, false);
 
                return apply(function, resultType);
        }
@@ -240,7 +239,7 @@ public class WindowedStream<T, K, W extends Window> {
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <R> SingleOutputStreamOperator<R, ?> 
apply(WindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> 
resultType) {
+       public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, 
K, W> function, TypeInformation<R> resultType) {
 
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
@@ -270,7 +269,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       function,
+                                       new 
InternalIterableWindowFunction<>(function),
                                        trigger,
                                        evictor);
 
@@ -285,7 +284,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       function,
+                                       new 
InternalIterableWindowFunction<>(function),
                                        trigger);
                }
 
@@ -350,13 +349,13 @@ public class WindowedStream<T, K, W extends Window> {
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
                        operator = new EvictingWindowOperator<>(windowAssigner,
-                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                               keySel,
-                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                               stateDesc,
-                               new ReduceApplyWindowFunction<>(reduceFunction, 
function),
-                               trigger,
-                               evictor);
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalIterableWindowFunction<>(new 
ReduceApplyWindowFunction<>(reduceFunction, function)),
+                                       trigger,
+                                       evictor);
 
                } else {
                        ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
@@ -370,7 +369,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       function,
+                                       new 
InternalSingleValueWindowFunction<>(function),
                                        trigger);
                }
 
@@ -441,7 +440,7 @@ public class WindowedStream<T, K, W extends Window> {
                                keySel,
                                
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                stateDesc,
-                               new FoldApplyWindowFunction<>(initialValue, 
foldFunction, function),
+                               new InternalIterableWindowFunction<>(new 
FoldApplyWindowFunction<>(initialValue, foldFunction, function)),
                                trigger,
                                evictor);
 
@@ -458,7 +457,7 @@ public class WindowedStream<T, K, W extends Window> {
                                keySel,
                                
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                stateDesc,
-                               function,
+                               new 
InternalSingleValueWindowFunction<>(function),
                                trigger);
                }
 
@@ -694,7 +693,7 @@ public class WindowedStream<T, K, W extends Window> {
                        }
                        else if (function instanceof WindowFunction) {
                                @SuppressWarnings("unchecked")
-                               WindowFunction<Iterable<T>, R, K, TimeWindow> 
wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+                               WindowFunction<T, R, K, TimeWindow> wf = 
(WindowFunction<T, R, K, TimeWindow>) function;
 
                                OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
                                                wf, input.getKeySelector(),
@@ -726,7 +725,7 @@ public class WindowedStream<T, K, W extends Window> {
                        }
                        else if (function instanceof WindowFunction) {
                                @SuppressWarnings("unchecked")
-                               WindowFunction<Iterable<T>, R, K, TimeWindow> 
wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+                               WindowFunction<T, R, K, TimeWindow> wf = 
(WindowFunction<T, R, K, TimeWindow>) function;
 
                                OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
                                                wf, input.getKeySelector(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
index 62e86ca..c497b4a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
@@ -30,9 +30,10 @@ import java.io.Serializable;
  *
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be 
applied on.
  */
 @Public
-public interface AllWindowFunction<IN, OUT,  W extends Window> extends 
Function, Serializable {
+public interface AllWindowFunction<IN, OUT, W extends Window> extends 
Function, Serializable {
 
        /**
         * Evaluates the window and outputs none or several elements.
@@ -43,5 +44,5 @@ public interface AllWindowFunction<IN, OUT,  W extends 
Window> extends Function,
         *
         * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
         */
-       void apply(W window, IN values, Collector<OUT> out) throws Exception;
+       void apply(W window, Iterable<IN> values, Collector<OUT> out) throws 
Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
index 76fd562..a5bc0a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
@@ -33,11 +33,12 @@ import org.apache.flink.util.Collector;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 
 @Internal
 public class FoldApplyAllWindowFunction<W extends Window, T, ACC>
        extends WrappingFunction<AllWindowFunction<ACC, ACC, W>>
-       implements AllWindowFunction<Iterable<T>, ACC, W>, 
OutputTypeConfigurable<ACC> {
+       implements AllWindowFunction<T, ACC, W>, OutputTypeConfigurable<ACC> {
 
        private static final long serialVersionUID = 1L;
 
@@ -75,7 +76,7 @@ public class FoldApplyAllWindowFunction<W extends Window, T, 
ACC>
                        result = foldFunction.fold(result, val);
                }
 
-               wrappedFunction.apply(window, result, out);
+               wrappedFunction.apply(window, 
Collections.singletonList(result), out);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
index 40e8830..756a683 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
@@ -33,11 +33,12 @@ import org.apache.flink.util.Collector;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 
 @Internal
 public class FoldApplyWindowFunction<K, W extends Window, T, ACC>
        extends WrappingFunction<WindowFunction<ACC, ACC, K, W>>
-       implements WindowFunction<Iterable<T>, ACC, K, W>, 
OutputTypeConfigurable<ACC> {
+       implements WindowFunction<T, ACC, K, W>, OutputTypeConfigurable<ACC> {
 
        private static final long serialVersionUID = 1L;
 
@@ -75,7 +76,7 @@ public class FoldApplyWindowFunction<K, W extends Window, T, 
ACC>
                        result = foldFunction.fold(result, val);
                }
 
-               wrappedFunction.apply(key, window, result, out);
+               wrappedFunction.apply(key, window, 
Collections.singletonList(result), out);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
index 3e3ffca..4435644 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
@@ -27,7 +27,9 @@ public class PassThroughAllWindowFunction<W extends Window, 
T> implements AllWin
        private static final long serialVersionUID = 1L;
 
        @Override
-       public void apply(W window, T input, Collector<T> out) throws Exception 
{
-               out.collect(input);
+       public void apply(W window, Iterable<T> input, Collector<T> out) throws 
Exception {
+               for (T in: input) {
+                       out.collect(in);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
index 21709b9..319acb6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
@@ -27,7 +27,9 @@ public class PassThroughWindowFunction<K, W extends Window, 
T> implements Window
        private static final long serialVersionUID = 1L;
 
        @Override
-       public void apply(K k, W window, T input, Collector<T> out) throws 
Exception {
-               out.collect(input);
+       public void apply(K k, W window, Iterable<T> input, Collector<T> out) 
throws Exception {
+               for (T in: input) {
+                       out.collect(in);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
index ce1615b..5b8dd70 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
@@ -23,10 +23,12 @@ import 
org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
+import java.util.Collections;
+
 @Internal
 public class ReduceApplyAllWindowFunction<W extends Window, T, R>
        extends WrappingFunction<AllWindowFunction<T, R, W>>
-       implements AllWindowFunction<Iterable<T>, R, W> {
+       implements AllWindowFunction<T, R, W> {
 
        private static final long serialVersionUID = 1L;
 
@@ -51,6 +53,6 @@ public class ReduceApplyAllWindowFunction<W extends Window, 
T, R>
                                curr = reduceFunction.reduce(curr, val);
                        }
                }
-               windowFunction.apply(window, curr, out);
+               windowFunction.apply(window, Collections.singletonList(curr), 
out);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
index 75ea2d2..f896282 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
@@ -23,10 +23,12 @@ import 
org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
+import java.util.Collections;
+
 @Internal
 public class ReduceApplyWindowFunction<K, W extends Window, T, R>
        extends WrappingFunction<WindowFunction<T, R, K, W>>
-       implements WindowFunction<Iterable<T>, R, K, W> {
+       implements WindowFunction<T, R, K, W> {
 
        private static final long serialVersionUID = 1L;
 
@@ -51,6 +53,6 @@ public class ReduceApplyWindowFunction<K, W extends Window, 
T, R>
                                curr = reduceFunction.reduce(curr, val);
                        }
                }
-               windowFunction.apply(k, window, curr, out);
+               windowFunction.apply(k, window, 
Collections.singletonList(curr), out);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
index a3b35ae..8ec5809 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 @Internal
-public class ReduceIterableAllWindowFunction<W extends Window, T> implements 
AllWindowFunction<Iterable<T>, T, W> {
+public class ReduceIterableAllWindowFunction<W extends Window, T> implements 
AllWindowFunction<T, T, W> {
        private static final long serialVersionUID = 1L;
 
        private final ReduceFunction<T> reduceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
index e296411..afb0219 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 @Internal
-public class ReduceIterableWindowFunction<K, W extends Window, T> implements 
WindowFunction<Iterable<T>, T, K, W> {
+public class ReduceIterableWindowFunction<K, W extends Window, T> implements 
WindowFunction<T, T, K, W> {
        private static final long serialVersionUID = 1L;
 
        private final ReduceFunction<T> reduceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
index 83ef18e..154fe88 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
@@ -31,6 +31,7 @@ import java.io.Serializable;
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
  * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be 
applied on.
  */
 @Public
 public interface WindowFunction<IN, OUT, KEY, W extends Window> extends 
Function, Serializable {
@@ -45,5 +46,5 @@ public interface WindowFunction<IN, OUT, KEY, W extends 
Window> extends Function
         * 
         * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery. 
         */
-       void apply(KEY key, W window, IN input, Collector<OUT> out) throws 
Exception;
+       void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) 
throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index b830789..9b353fe 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
 
-       private final WindowFunction<Iterable<Type>, Result, Key, Window> 
function;
+       private final WindowFunction<Type, Result, Key, Window> function;
 
        /**
         * IMPORTANT: This value needs to start at one, so it is fresher than 
the value that new entries have (zero) */
@@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        // 
------------------------------------------------------------------------
        
-       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
WindowFunction<Iterable<Type>, Result, Key, Window> function) {
+       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
WindowFunction<Type, Result, Key, Window> function) {
                this.keySelector = keySelector;
                this.function = function;
        }
@@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        
        static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-               private final WindowFunction<Iterable<Type>, Result, Key, 
Window> function;
+               private final WindowFunction<Type, Result, Key, Window> 
function;
                
                private final UnionIterator<Type> unionIterator;
                
@@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                private Key currentKey;
                
 
-               WindowFunctionTraversal(WindowFunction<Iterable<Type>, Result, 
Key, Window> function, TimeWindow window,
+               WindowFunctionTraversal(WindowFunction<Type, Result, Key, 
Window> function, TimeWindow window,
                                                                
Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
                        this.function = function;
                        this.out = out;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 2f0d4fe..9ea2949 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -33,13 +33,13 @@ import java.util.ArrayList;
 
 @Internal
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> {
+               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
 
        private static final long serialVersionUID = 7305948082830843475L;
 
        
        public AccumulatingProcessingTimeWindowOperator(
-                       WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> 
function,
+                       WindowFunction<IN, OUT, KEY, TimeWindow> function,
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        TypeSerializer<IN> valueSerializer,
@@ -53,7 +53,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
        @Override
        protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
                @SuppressWarnings("unchecked")
-               WindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = 
(WindowFunction<Iterable<IN>, OUT, KEY, Window>) function;
+               WindowFunction<IN, OUT, KEY, Window> windowFunction = 
(WindowFunction<IN, OUT, KEY, Window>) function;
                
                return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 510ebb2..221367d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -49,7 +49,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W 
extends Window> extends N
        public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
                        TypeSerializer<W> windowSerializer,
                        WindowBufferFactory<? super IN, ? extends 
EvictingWindowBuffer<IN>> windowBufferFactory,
-                       AllWindowFunction<Iterable<IN>, OUT, W> windowFunction,
+                       AllWindowFunction<IN, OUT, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger,
                        Evictor<? super IN, ? super W> evictor) {
                super(windowAssigner, windowSerializer, windowBufferFactory, 
windowFunction, trigger);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index cfab3d5..16ca488 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -27,12 +27,12 @@ import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
@@ -48,7 +48,7 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <K> The type of key returned by the {@code KeySelector}.
  * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <OUT> The type of elements emitted by the {@code 
InternalWindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 @Internal
@@ -65,7 +65,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                KeySelector<IN, K> keySelector,
                TypeSerializer<K> keySerializer,
                StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
windowStateDescriptor,
-               WindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
+               InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
                Trigger<? super IN, ? super W> trigger,
                Evictor<? super IN, ? super W> evictor) {
                super(windowAssigner, windowSerializer, keySelector, 
keySerializer, null, windowFunction, trigger);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index e42d7b4..95feadc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -77,7 +77,7 @@ import static java.util.Objects.requireNonNull;
  */
 @Internal
 public class NonKeyedWindowOperator<IN, OUT, W extends Window>
-               extends AbstractUdfStreamOperator<OUT, 
AllWindowFunction<Iterable<IN>, OUT, W>>
+               extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, 
OUT, W>>
                implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
 
        private static final long serialVersionUID = 1L;
@@ -146,7 +146,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        public NonKeyedWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
                        TypeSerializer<W> windowSerializer,
                        WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory,
-                       AllWindowFunction<Iterable<IN>, OUT, W> windowFunction,
+                       AllWindowFunction<IN, OUT, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger) {
 
                super(windowFunction);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 289492b..9b7b347 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -34,7 +34,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -47,6 +46,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
@@ -74,7 +74,7 @@ import static java.util.Objects.requireNonNull;
  * <p>
  * Each pane gets its own instance of the provided {@code Trigger}. This 
trigger determines when
  * the contents of the pane should be processed to emit results. When a 
trigger fires,
- * the given {@link WindowFunction} is invoked to produce the results that are 
emitted for
+ * the given {@link InternalWindowFunction} is invoked to produce the results 
that are emitted for
  * the pane to which the {@code Trigger} belongs.
  *
  * <p>
@@ -83,12 +83,12 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <K> The type of key returned by the {@code KeySelector}.
  * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <OUT> The type of elements emitted by the {@code 
InternalWindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 @Internal
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
-       extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>>
+       extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, 
K, W>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
 
        private static final long serialVersionUID = 1L;
@@ -126,7 +126,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        // 
------------------------------------------------------------------------
 
        /**
-        * This is given to the {@code WindowFunction} for emitting elements 
with a given timestamp.
+        * This is given to the {@code InternalWindowFunction} for emitting 
elements with a given timestamp.
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
@@ -162,7 +162,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                KeySelector<IN, K> keySelector,
                TypeSerializer<K> keySerializer,
                StateDescriptor<? extends MergingState<IN, ACC>, ?> 
windowStateDescriptor,
-               WindowFunction<ACC, OUT, K, W> windowFunction,
+               InternalWindowFunction<ACC, OUT, K, W> windowFunction,
                Trigger<? super IN, ? super W> trigger) {
 
                super(windowFunction);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
new file mode 100644
index 0000000..32318ea
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes 
an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends 
Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements 
RichFunction {
+       private static final long serialVersionUID = 1L;
+
+       protected WindowFunction<IN, OUT, KEY, W> wrappedFunction;
+
+       public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> 
wrappedFunction) {
+               this.wrappedFunction = wrappedFunction;
+       }
+
+       @Override
+       public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> 
out) throws Exception {
+               wrappedFunction.apply(key, window, input, out);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               FunctionUtils.openFunction(this.wrappedFunction, parameters);
+       }
+
+       @Override
+       public void close() throws Exception {
+               FunctionUtils.closeFunction(this.wrappedFunction);
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, 
t);
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
new file mode 100644
index 0000000..fd10e5c
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes 
an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends 
Window> extends InternalWindowFunction<IN, OUT, KEY, W> implements RichFunction 
{
+       private static final long serialVersionUID = 1L;
+
+       protected WindowFunction<IN, OUT, KEY, W> wrappedFunction;
+
+       public InternalSingleValueWindowFunction(WindowFunction<IN, OUT, KEY, 
W> wrappedFunction) {
+               this.wrappedFunction = wrappedFunction;
+       }
+
+       @Override
+       public void apply(KEY key, W window, IN input, Collector<OUT> out) 
throws Exception {
+               wrappedFunction.apply(key, window, 
Collections.singletonList(input), out);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               FunctionUtils.openFunction(this.wrappedFunction, parameters);
+       }
+
+       @Override
+       public void close() throws Exception {
+               FunctionUtils.closeFunction(this.wrappedFunction);
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, 
t);
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
new file mode 100644
index 0000000..e75f3be
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Internal interface for functions that are evaluated over keyed (grouped) 
windows.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ */
+public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window> 
implements Function, Serializable {
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Evaluates the window and outputs none or several elements.
+        *
+        * @param key The key for which this window is evaluated.
+        * @param window The window that is being evaluated.
+        * @param input The elements in the window being evaluated.
+        * @param out A collector for emitting elements.
+        *
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
+        */
+       public abstract void apply(KEY key, W window, IN input, Collector<OUT> 
out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 502198c..7a4d6f8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -518,7 +518,7 @@ public class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
                DataStream<String> window = map
                                .windowAll(GlobalWindows.create())
                                .trigger(PurgingTrigger.of(CountTrigger.of(5)))
-                               .apply(new 
AllWindowFunction<Iterable<Tuple2<Integer, String>>, String, GlobalWindow>() {
+                               .apply(new AllWindowFunction<Tuple2<Integer, 
String>, String, GlobalWindow>() {
                                        @Override
                                        public void apply(GlobalWindow window,
                                                        
Iterable<Tuple2<Integer, String>> values,

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index b8d57a6..0b0ab9e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -70,9 +70,11 @@ public class FoldApplyWindowFunctionTest {
                                @Override
                                public void apply(Integer integer,
                                        TimeWindow window,
-                                       Integer input,
+                                       Iterable<Integer> input,
                                        Collector<Integer> out) throws 
Exception {
-                                       out.collect(input);
+                                       for (Integer in: input) {
+                                               out.collect(in);
+                                       }
                                }
                        }
                );

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index b6e51c6..dff1184 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -67,7 +67,7 @@ import static org.junit.Assert.*;
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @SuppressWarnings("unchecked")
-       private final WindowFunction<Iterable<String>, String, String, 
TimeWindow> mockFunction = mock(WindowFunction.class);
+       private final WindowFunction<String, String, String, TimeWindow> 
mockFunction = mock(WindowFunction.class);
 
        @SuppressWarnings("unchecked")
        private final KeySelector<String, String> mockKeySelector = 
mock(KeySelector.class);
@@ -79,8 +79,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                }
        };
        
-       private final WindowFunction<Iterable<Integer>, Integer, Integer, 
TimeWindow> validatingIdentityFunction =
-                       new WindowFunction<Iterable<Integer>, Integer, Integer, 
TimeWindow>()
+       private final WindowFunction<Integer, Integer, Integer, TimeWindow> 
validatingIdentityFunction =
+                       new WindowFunction<Integer, Integer, Integer, 
TimeWindow>()
        {
                @Override
                public void apply(Integer key,
@@ -723,7 +723,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
        
-       private static class FailingFunction implements 
WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> {
+       private static class FailingFunction implements WindowFunction<Integer, 
Integer, Integer, TimeWindow> {
 
                private final int failAfterElements;
                
@@ -751,7 +751,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
 
-       private static class StatefulFunction extends 
RichWindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> {
+       private static class StatefulFunction extends 
RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
                
                // we use a concurrent map here even though there is no 
concurrency, to
                // get "volatile" style access to entries

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 0583290..42f452c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -76,7 +76,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
+                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -123,7 +123,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window2 = source
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
-                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
+                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -172,7 +172,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
-                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
+                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 2f1dce5..6af7ac4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -70,7 +71,7 @@ public class EvictingWindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new ReduceIterableWindowFunction<String, 
GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+                               new InternalIterableWindowFunction<>(new 
ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new 
SumReducer())),
                                CountTrigger.of(WINDOW_SLIDE),
                                CountEvictor.of(WINDOW_SIZE));
 
@@ -141,7 +142,7 @@ public class EvictingWindowOperatorTest {
                        new TupleKeySelector(),
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
-                       new RichSumReducer<GlobalWindow>(closeCalled),
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
                        CountTrigger.of(WINDOW_SLIDE),
                        CountEvictor.of(WINDOW_SIZE));
 
@@ -208,7 +209,7 @@ public class EvictingWindowOperatorTest {
                }
        }
 
-       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
String, W> {
+       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index a5a8df3..f214941 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -74,7 +74,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
                                .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS))
-                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -117,7 +117,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window2 = source
                        .keyBy(0)
                        .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
-                       .apply(new WindowFunction<Iterable<Tuple2<String, 
Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+                       .apply(new WindowFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple, TimeWindow>() {
                                private static final long serialVersionUID = 1L;
 
                                @Override
@@ -165,7 +165,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .timeWindowAll(Time.of(1000, 
TimeUnit.MILLISECONDS))
-                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
+                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index c1111a0..a1f08ad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -41,6 +41,8 @@ import 
org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -140,7 +142,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new PassThroughWindowFunction<String, 
TimeWindow, Tuple2<String, Integer>>(),
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                EventTimeTrigger.create());
 
                operator.setInputType(inputType, new ExecutionConfig());
@@ -176,7 +178,7 @@ public class WindowOperatorTest {
                        new TupleKeySelector(),
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
-                       new RichSumReducer<TimeWindow>(),
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<TimeWindow>()),
                        EventTimeTrigger.create());
 
                operator.setInputType(inputType, new ExecutionConfig());
@@ -271,7 +273,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new PassThroughWindowFunction<String, 
TimeWindow, Tuple2<String, Integer>>(),
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -306,7 +308,7 @@ public class WindowOperatorTest {
                        new TupleKeySelector(),
                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
                        stateDesc,
-                       new RichSumReducer<TimeWindow>(),
+                       new InternalIterableWindowFunction<>(new 
RichSumReducer<TimeWindow>()),
                        EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -344,7 +346,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new PassThroughWindowFunction<String, 
GlobalWindow, Tuple2<String, Integer>>(),
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
                                
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -434,7 +436,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new PassThroughWindowFunction<String, 
GlobalWindow, Tuple2<String, Integer>>(),
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
                                
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse(
@@ -497,7 +499,7 @@ public class WindowOperatorTest {
        }
 
 
-       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
String, W> {
+       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index c57da8a..30bb840 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -104,7 +104,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
                                .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -112,7 +112,6 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                                                        TimeWindow window,
                                                        Iterable<Tuple2<String, 
Integer>> values,
                                                        
Collector<Tuple2<String, Integer>> out) throws Exception {
-
                                        }
                                });
 
@@ -153,7 +152,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                                .keyBy(0)
                                .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
-                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -204,7 +203,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                                .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
-                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index e36542e..3c91529 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => 
JavaAllWStream}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction 
=> JAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -179,10 +180,10 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    * @return The data stream that is the result of applying the window 
function to the window.
    */
   def apply[R: TypeInformation](
-      function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = {
+      function: AllWindowFunction[T, R, W]): DataStream[R] = {
     
     val cleanedFunction = clean(function)
-    val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
+    val javaFunction = new JAllWindowFunction[T, R, W] {
       def apply(window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
         cleanedFunction(window, elements.asScala, out)
       }
@@ -205,7 +206,7 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
       function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     
     val cleanedFunction = clean(function)
-    val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
+    val applyFunction = new JAllWindowFunction[T, R, W] {
       def apply(window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
         cleanedFunction(window, elements.asScala, out)
       }
@@ -228,8 +229,15 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
       preAggregator: ReduceFunction[T],
       function: AllWindowFunction[T, R, W]): DataStream[R] = {
 
+    val cleanedFunction = clean(function)
+    val applyFunction = new JAllWindowFunction[T, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
+        cleanedFunction(window, elements.asScala, out)
+      }
+    }
+
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(javaStream.apply(clean(preAggregator), clean(function), 
returnType))
+    asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, 
returnType))
   }
 
   /**
@@ -245,7 +253,7 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    */
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
-      function: (W, T, Collector[R]) => Unit): DataStream[R] = {
+      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -259,9 +267,9 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new AllWindowFunction[T, R, W] {
-      def apply(window: W, input: T, out: Collector[R]): Unit = {
-        cleanApply(window, input, out)
+    val applyFunction = new JAllWindowFunction[T, R, W] {
+      def apply(window: W, input: java.lang.Iterable[T], out: Collector[R]): 
Unit = {
+        cleanApply(window, input.asScala, out)
       }
     }
     
@@ -285,11 +293,18 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
       initialValue: R,
       preAggregator: FoldFunction[T, R],
       function: AllWindowFunction[R, R, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val applyFunction = new JAllWindowFunction[R, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[R], out: 
Collector[R]): Unit = {
+        cleanedFunction(window, elements.asScala, out)
+      }
+    }
     
     asScalaStream(javaStream.apply(
       initialValue,
       clean(preAggregator),
-      clean(function),
+      applyFunction,
       implicitly[TypeInformation[R]]))
   }
 
@@ -308,7 +323,7 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
   def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: (R, T) => R,
-      function: (W, R, Collector[R]) => Unit): DataStream[R] = {
+      function: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -322,9 +337,9 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new AllWindowFunction[R, R, W] {
-      def apply(window: W, input: R, out: Collector[R]): Unit = {
-        cleanApply(window, input, out)
+    val applyFunction = new JAllWindowFunction[R, R, W] {
+      def apply(window: W, input: java.lang.Iterable[R], out: Collector[R]): 
Unit = {
+        cleanApply(window, input.asScala, out)
       }
     }
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 53f033c..b7f9e00 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => 
JavaWStream}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => 
JWindowFunction}
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -182,10 +183,10 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    * @return The data stream that is the result of applying the window 
function to the window.
    */
   def apply[R: TypeInformation](
-      function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = {
+      function: WindowFunction[T, R, K, W]): DataStream[R] = {
     
     val cleanFunction = clean(function)
-    val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
+    val javaFunction = new JWindowFunction[T, R, K, W] {
       def apply(key: K, window: W, input: java.lang.Iterable[T], out: 
Collector[R]) = {
         cleanFunction.apply(key, window, input.asScala, out)
       }
@@ -212,7 +213,7 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     }
 
     val cleanedFunction = clean(function)
-    val applyFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
+    val applyFunction = new JWindowFunction[T, R, K, W] {
       def apply(key: K, window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
         cleanedFunction(key, window, elements.asScala, out)
       }
@@ -235,8 +236,16 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
       preAggregator: ReduceFunction[T],
       function: WindowFunction[T, R, K, W]): DataStream[R] = {
 
+    val cleanedFunction = clean(function)
+
+    val applyFunction = new JWindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
+        cleanedFunction.apply(key, window, elements.asScala, out)
+      }
+    }
+
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(javaStream.apply(clean(preAggregator), clean(function), 
resultType))
+    asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, 
resultType))
   }
 
   /**
@@ -252,7 +261,7 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    */
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
-      function: (K, W, T, Collector[R]) => Unit): DataStream[R] = {
+      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
@@ -267,9 +276,9 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new WindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, input: T, out: Collector[R]): Unit = {
-        cleanApply(key, window, input, out)
+    val applyFunction = new JWindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, input: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
+        cleanApply(key, window, input.asScala, out)
       }
     }
     
@@ -292,11 +301,19 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
       initialValue: R,
       foldFunction: FoldFunction[T, R],
       function: WindowFunction[R, R, K, W]): DataStream[R] = {
-    
+
+    val cleanedFunction = clean(function)
+
+    val applyFunction = new JWindowFunction[R, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[R], out: 
Collector[R]): Unit = {
+        cleanedFunction.apply(key, window, elements.asScala, out)
+      }
+    }
+
     asScalaStream(javaStream.apply(
       initialValue,
       clean(foldFunction),
-      clean(function),
+      applyFunction,
       implicitly[TypeInformation[R]]))
   }
 
@@ -314,7 +331,7 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
   def apply[R: TypeInformation](
       initialValue: R,
       foldFunction: (R, T) => R,
-      function: (K, W, R, Collector[R]) => Unit): DataStream[R] = {
+      function: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
     
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -329,9 +346,9 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new WindowFunction[R, R, K, W] {
-      def apply(key: K, window: W, input: R, out: Collector[R]): Unit = {
-        cleanApply(key, window, input, out)
+    val applyFunction = new JWindowFunction[R, R, K, W] {
+      def apply(key: K, window: W, input: java.lang.Iterable[R], out: 
Collector[R]): Unit = {
+        cleanApply(key, window, input.asScala, out)
       }
     }
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]

Reply via email to