Repository: flink
Updated Branches:
  refs/heads/master 1cc1bb41e -> bcaf816dc


http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index ced27b6..8748ed4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -260,23 +260,28 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Integer> source = env.fromElements(1, 2);
 
-               DataStream<Tuple3<String, String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window1 = source
+                               .keyBy(new KeySelector<Integer, String>() {
+                                       @Override
+                                       public String getKey(Integer value) 
throws Exception {
+                                               return value.toString();
+                                       }
+                               })
                                
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
                                .evictor(CountEvictor.of(5))
                                .process(new TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>>) window1.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputTransformation<Integer, String> transform = 
(OneInputTransformation<Integer, String>) window1.getTransformation();
+               final OneInputStreamOperator<Integer, String> operator = 
transform.getOperator();
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
operator;
+               WindowOperator<String, Integer, ?, ?, ?> winOperator = 
(WindowOperator<String, Integer, ?, ?, ?>) operator;
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
EventTimeSessionWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
-               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
+               processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO,  1);
        }
 
        // 
------------------------------------------------------------------------
@@ -604,28 +609,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<Integer> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
Integer> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, Integer>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
Integer> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -633,28 +640,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<Integer> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
Integer> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, Integer>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
Integer> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -662,30 +671,32 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
                DummyReducer reducer = new DummyReducer();
 
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -693,28 +704,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -722,30 +735,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DummyReducer reducer = new DummyReducer();
-
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -753,28 +766,30 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple3<String, String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window = source
+                               .keyBy(new Tuple3KeySelector())
                                
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
                                .aggregate(new DummyAggregationFunction(), new 
TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, 
String, Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                       (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
ProcessingTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
AggregatingStateDescriptor);
 
                processElementAndEnsureOutput(
-                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               operator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        // 
------------------------------------------------------------------------
@@ -1406,29 +1421,31 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<Integer> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
                                .aggregate(new DummyAggregationFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
Integer> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, Integer>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
Integer> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -1436,42 +1453,33 @@ public class WindowTranslationTest {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               DataStream<Tuple3<String, String, Integer>> source = 
env.fromElements(
+                       Tuple3.of("hello", "hallo", 1),
+                       Tuple3.of("hello", "hallo", 2));
 
-               DataStream<Tuple2<String, Integer>> window1 = source
-                               .keyBy(new TupleKeySelector())
+               DataStream<String> window1 = source
+                               .keyBy(new Tuple3KeySelector())
                                .window(SlidingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
                                .aggregate(
                                                new DummyAggregationFunction(),
-                                               new 
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow>() {
-                                                       @Override
-                                                       public void process(
-                                                                       String 
s,
-                                                                       Context 
context,
-                                                                       
Iterable<Tuple2<String, Integer>> elements,
-                                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
-                                                               for 
(Tuple2<String, Integer> in : elements) {
-                                                                       
out.collect(in);
-                                                               }
-                                                       }
-                                               });
+                                               new 
TestProcessWindowFunction());
 
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+               final OneInputTransformation<Tuple3<String, String, Integer>, 
String> transform =
+                       (OneInputTransformation<Tuple3<String, String, 
Integer>, String>) window1.getTransformation();
 
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               final OneInputStreamOperator<Tuple3<String, String, Integer>, 
String> operator = transform.getOperator();
 
                Assert.assertTrue(operator instanceof WindowOperator);
-               WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> 
winOperator =
-                               (WindowOperator<String, Tuple2<String, 
Integer>, ?, ?, ?>) operator;
+               WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, 
?> winOperator =
+                               (WindowOperator<String, Tuple3<String, String, 
Integer>, ?, ?, ?>) operator;
 
                Assert.assertTrue(winOperator.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
SlidingEventTimeWindows);
                Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
ListStateDescriptor);
 
                processElementAndEnsureOutput(
-                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+                               winOperator, winOperator.getKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
        }
 
        @Test
@@ -1683,7 +1691,7 @@ public class WindowTranslationTest {
        }
 
        private static class DummyAggregationFunction
-                       implements AggregateFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>> {
+                       implements AggregateFunction<Tuple3<String, String, 
Integer>, Tuple2<String, Integer>, Integer> {
 
                @Override
                public Tuple2<String, Integer> createAccumulator() {
@@ -1691,14 +1699,14 @@ public class WindowTranslationTest {
                }
 
                @Override
-               public void add(Tuple2<String, Integer> value, Tuple2<String, 
Integer> accumulator) {
+               public void add(Tuple3<String, String, Integer> value, 
Tuple2<String, Integer> accumulator) {
                        accumulator.f0 = value.f0;
-                       accumulator.f1 = value.f1;
+                       accumulator.f1 = value.f2;
                }
 
                @Override
-               public Tuple2<String, Integer> getResult(Tuple2<String, 
Integer> accumulator) {
-                       return accumulator;
+               public Integer getResult(Tuple2<String, Integer> accumulator) {
+                       return accumulator.f1;
                }
 
                @Override
@@ -1729,31 +1737,31 @@ public class WindowTranslationTest {
        }
 
        private static class TestWindowFunction
-                       implements WindowFunction<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow> {
+                       implements WindowFunction<Integer, String, String, 
TimeWindow> {
 
                @Override
                public void apply(String key,
                                TimeWindow window,
-                               Iterable<Tuple2<String, Integer>> values,
-                               Collector<Tuple3<String, String, Integer>> out) 
throws Exception {
+                               Iterable<Integer> values,
+                               Collector<String> out) throws Exception {
 
-                       for (Tuple2<String, Integer> in : values) {
-                               out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+                       for (Integer in : values) {
+                               out.collect(in.toString());
                        }
                }
        }
 
        private static class TestProcessWindowFunction
-                       extends ProcessWindowFunction<Tuple2<String, Integer>, 
Tuple3<String, String, Integer>, String, TimeWindow> {
+                       extends ProcessWindowFunction<Integer, String, String, 
TimeWindow> {
 
                @Override
                public void process(String key,
                                Context ctx,
-                               Iterable<Tuple2<String, Integer>> values,
-                               Collector<Tuple3<String, String, Integer>> out) 
throws Exception {
+                               Iterable<Integer> values,
+                               Collector<String> out) throws Exception {
 
-                       for (Tuple2<String, Integer> in : values) {
-                               out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+                       for (Integer in : values) {
+                               out.collect(in.toString());
                        }
                }
        }
@@ -1765,4 +1773,12 @@ public class WindowTranslationTest {
                        return value.f0;
                }
        }
+
+       private static class Tuple3KeySelector implements 
KeySelector<Tuple3<String, String, Integer>, String> {
+
+               @Override
+               public String getKey(Tuple3<String, String, Integer> value) 
throws Exception {
+                       return value.f0;
+               }
+       }
 }

Reply via email to