Repository: flink
Updated Branches:
  refs/heads/release-1.3 f72eff7fc -> b2d6dc1d4


http://git-wip-us.apache.org/repos/asf/flink/blob/b2d6dc1d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 7913e95..9737d2f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -295,8 +295,7 @@ public class WindowedStream<T, K, W extends Window> {
                        LegacyWindowOperatorType legacyWindowOpType) {
 
                TypeInformation<T> inType = input.getType();
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, WindowFunction.class, true, true, inType, 
null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, inType);
 
                return reduce(reduceFunction, function, resultType, 
legacyWindowOpType);
        }
@@ -396,8 +395,7 @@ public class WindowedStream<T, K, W extends Window> {
         */
        @PublicEvolving
        public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, ProcessWindowFunction.class, true, 
true, input.getType(), null, false);
+               TypeInformation<R> resultType = 
getProcessWindowFunctionReturnType(function, input.getType(), null);
 
                return reduce(reduceFunction, function, resultType);
        }
@@ -544,8 +542,7 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<ACC> foldAccumulatorType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
                        Utils.getCallLocationName(), true);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, WindowFunction.class, true, true, 
foldAccumulatorType, null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, foldAccumulatorType);
 
                return fold(initialValue, foldFunction, function, 
foldAccumulatorType, resultType);
        }
@@ -663,8 +660,7 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<ACC> foldResultType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
                                Utils.getCallLocationName(), true);
 
-               TypeInformation<R> windowResultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, ProcessWindowFunction.class, 
true, true, foldResultType, Utils.getCallLocationName(), false);
+               TypeInformation<R> windowResultType = 
getProcessWindowFunctionReturnType(windowFunction, foldResultType, 
Utils.getCallLocationName());
 
                return fold(initialValue, foldFunction, windowFunction, 
foldResultType, windowResultType);
        }
@@ -852,8 +848,7 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<V> aggResultType = 
TypeExtractor.getAggregateFunctionReturnType(
                                aggFunction, input.getType(), null, false);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, WindowFunction.class, true, 
true, aggResultType, null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(windowFunction, aggResultType);
 
                return aggregate(aggFunction, windowFunction, accumulatorType, 
aggResultType, resultType);
        }
@@ -981,12 +976,42 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<V> aggResultType = 
TypeExtractor.getAggregateFunctionReturnType(
                                aggFunction, input.getType(), null, false);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, ProcessWindowFunction.class, 
true, true, aggResultType, null, false);
+               TypeInformation<R> resultType = 
getProcessWindowFunctionReturnType(windowFunction, aggResultType, null);
 
                return aggregate(aggFunction, windowFunction, accumulatorType, 
aggResultType, resultType);
        }
 
+       private static <IN, OUT, KEY> TypeInformation<OUT> 
getWindowFunctionReturnType(
+               WindowFunction<IN, OUT, KEY, ?> function,
+               TypeInformation<IN> inType) {
+               return TypeExtractor.getUnaryOperatorReturnType(
+                       function,
+                       WindowFunction.class,
+                       0,
+                       1,
+                       new int[]{2, 0},
+                       new int[]{3, 0},
+                       inType,
+                       null,
+                       false);
+       }
+
+       private static <IN, OUT, KEY> TypeInformation<OUT> 
getProcessWindowFunctionReturnType(
+                       ProcessWindowFunction<IN, OUT, KEY, ?> function,
+                       TypeInformation<IN> inType,
+                       String functionName) {
+               return TypeExtractor.getUnaryOperatorReturnType(
+                       function,
+                       ProcessWindowFunction.class,
+                       0,
+                       1,
+                       new int[]{2, 0},
+                       new int[]{3, 0},
+                       inType,
+                       functionName,
+                       false);
+       }
+
        /**
         * Applies the given window function to each window. The window 
function is called for each
         * evaluation of the window for each key individually. The output of 
the window function is
@@ -1094,8 +1119,7 @@ public class WindowedStream<T, K, W extends Window> {
         * @return The data stream that is the result of applying the window 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, 
W> function) {
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, WindowFunction.class, true, true, 
getInputType(), null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, getInputType());
 
                return apply(function, resultType);
        }
@@ -1131,8 +1155,7 @@ public class WindowedStream<T, K, W extends Window> {
         */
        @PublicEvolving
        public <R> SingleOutputStreamOperator<R> 
process(ProcessWindowFunction<T, R, K, W> function) {
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, ProcessWindowFunction.class, true, true, 
getInputType(), null, false);
+               TypeInformation<R> resultType = 
getProcessWindowFunctionReturnType(function, getInputType(), null);
 
                return process(function, resultType);
        }
@@ -1231,8 +1254,7 @@ public class WindowedStream<T, K, W extends Window> {
        @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function) {
                TypeInformation<T> inType = input.getType();
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, WindowFunction.class, true, true, 
inType, null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, inType);
 
                return apply(reduceFunction, function, resultType);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b2d6dc1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 5071c37..843839d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -259,23 +259,28 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Integer> source = env.fromElements(1, 2);
 
-               DataStream<Tuple3<String, String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window1 = source
+                               .keyBy(new KeySelector<Integer, String>() {
+                                       @Override
+                                       public String getKey(Integer value) 
throws Exception {
+                                               return value.toString();
+                                       }
+                               })
                                
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
                                .evictor(CountEvictor.of(5))
                                .process(new TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputTransformation<Integer, String> transform = 
(OneInputTransformation<Integer, String>) window1.getTransformation();
+               final OneInputStreamOperator<Integer, String> operator = 
transform.getOperator();
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               WindowOperator<String, Integer, ?, ?, ?> winOperator = 
(WindowOperator<String, Integer, ?, ?, ?>) operator;
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
EventTimeSessionWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
-               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO,  1);
        }
 
        // 
------------------------------------------------------------------------
@@ -603,28 +608,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<Integer> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
Integer> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, Integer>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
Integer> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -632,28 +639,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<Integer> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
Integer> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, Integer>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
Integer> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -661,30 +670,32 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
                DummyReducer reducer = new DummyReducer();
 
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -692,28 +703,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -721,30 +734,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DummyReducer reducer = new DummyReducer();
-
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -752,28 +765,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        // 
------------------------------------------------------------------------
@@ -1405,29 +1420,31 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<Integer> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
Integer> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, Integer>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
Integer> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -1435,42 +1452,33 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
                                .aggregate(
                                                new DummyAggregationFunction(),
-                                               new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow>() {
-                                                       @Override
-                                                       public void process(
-                                                                       String 
s,
-                                                                       Context 
context,
-                                                                       
Iterable<Tuple2<String, Integer>> elements,
-                                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
-                                                               for 
(Tuple2<String, Integer> in : elements) {
-                                                                       
out.collect(in);
-                                                               }
-                                                       }
-                                               });
+                                               new 
TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
 
@@ -1683,7 +1691,7 @@ public class WindowTranslationTest {
        }
 
        private static class DummyAggregationFunction
-                       implements AggregateFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>> {
+                       implements AggregateFunction<Tuple3<String, String, 
Integer>, Tuple2<String, Integer>, Integer> {
 
                @Override
                public Tuple2<String, Integer> createAccumulator() {
@@ -1691,14 +1699,14 @@ public class WindowTranslationTest {
                }
 
                @Override
-               public void add(Tuple2<String, Integer> value, Tuple2<String, 
Integer> accumulator) {
+               public void add(Tuple3<String, String, Integer> value, 
Tuple2<String, Integer> accumulator) {
                        accumulator.f0 = value.f0;
-                       accumulator.f1 = value.f1;
+                       accumulator.f1 = value.f2;
                }
 
                @Override
-               public Tuple2<String, Integer> getResult(Tuple2<String, 
Integer> accumulator) {
-                       return accumulator;
+               public Integer getResult(Tuple2<String, Integer> accumulator) {
+                       return accumulator.f1;
                }
 
                @Override
@@ -1729,31 +1737,31 @@ public class WindowTranslationTest {
        }
 
        private static class TestWindowFunction
-                       implements WindowFunction<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow> {
+                       implements WindowFunction<Integer, String, String, 
TimeWindow> {
 
                @Override
                public void apply(String key,
                                TimeWindow window,
-                               Iterable<Tuple2<String, Integer>> values,
-                               Collector<Tuple3<String, String, Integer>> out) 
throws Exception {
+                               Iterable<Integer> values,
+                               Collector<String> out) throws Exception {
 
-                       for (Tuple2<String, Integer> in : values) {
-                               out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+                       for (Integer in : values) {
+                               out.collect(in.toString());
                        }
                }
        }
 
        private static class TestProcessWindowFunction
-                       extends ProcessWindowFunction<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow> {
+                       extends ProcessWindowFunction<Integer, String, String, 
TimeWindow> {
 
                @Override
                public void process(String key,
                                Context ctx,
-                               Iterable<Tuple2<String, Integer>> values,
-                               Collector<Tuple3<String, String, Integer>> out) 
throws Exception {
+                               Iterable<Integer> values,
+                               Collector<String> out) throws Exception {
 
-                       for (Tuple2<String, Integer> in : values) {
-                               out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+                       for (Integer in : values) {
+                               out.collect(in.toString());
                        }
                }
        }
@@ -1766,4 +1774,12 @@ public class WindowTranslationTest {
                        return value.f0;
                }
        }
+
+       private static class Tuple3KeySelector implements 
KeySelector<Tuple3<String, String, Integer>, String> {
+
+               @Override
+               public String getKey(Tuple3<String, String, Integer> value) 
throws Exception {
+                       return value.f0;
+               }
+       }
 }

Reply via email to