Repository: flink Updated Branches: refs/heads/master 1cc1bb41e -> bcaf816dc
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index ced27b6..8748ed4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -260,23 +260,28 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Integer> source = env.fromElements(1, 2); - DataStream<Tuple3<String, String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<String> window1 = source + .keyBy(new KeySelector<Integer, String>() { + @Override + public String getKey(Integer value) throws Exception { + return value.toString(); + } + }) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .evictor(CountEvictor.of(5)) .process(new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputTransformation<Integer, String> transform = (OneInputTransformation<Integer, String>) window1.getTransformation(); + final OneInputStreamOperator<Integer, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Integer, ?, ?, ?> winOperator = (WindowOperator<String, Integer, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); - processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, 1); } // ------------------------------------------------------------------------ @@ -604,28 +609,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<Integer> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -633,28 +640,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<Integer> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -662,30 +671,32 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); DummyReducer reducer = new DummyReducer(); - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -693,28 +704,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -722,30 +735,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DummyReducer reducer = new DummyReducer(); - - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -753,28 +766,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } // ------------------------------------------------------------------------ @@ -1406,29 +1421,31 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<Integer> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -1436,42 +1453,33 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<String> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .aggregate( new DummyAggregationFunction(), - new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { - @Override - public void process( - String s, - Context context, - Iterable<Tuple2<String, Integer>> elements, - Collector<Tuple2<String, Integer>> out) throws Exception { - for (Tuple2<String, Integer> in : elements) { - out.collect(in); - } - } - }); + new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -1683,7 +1691,7 @@ public class WindowTranslationTest { } private static class DummyAggregationFunction - implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> { + implements AggregateFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, Integer> { @Override public Tuple2<String, Integer> createAccumulator() { @@ -1691,14 +1699,14 @@ public class WindowTranslationTest { } @Override - public void add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) { + public void add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) { accumulator.f0 = value.f0; - accumulator.f1 = value.f1; + accumulator.f1 = value.f2; } @Override - public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) { - return accumulator; + public Integer getResult(Tuple2<String, Integer> accumulator) { + return accumulator.f1; } @Override @@ -1729,31 +1737,31 @@ public class WindowTranslationTest { } private static class TestWindowFunction - implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> { + implements WindowFunction<Integer, String, String, TimeWindow> { @Override public void apply(String key, TimeWindow window, - Iterable<Tuple2<String, Integer>> values, - Collector<Tuple3<String, String, Integer>> out) throws Exception { + Iterable<Integer> values, + Collector<String> out) throws Exception { - for (Tuple2<String, Integer> in : values) { - out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + for (Integer in : values) { + out.collect(in.toString()); } } } private static class TestProcessWindowFunction - extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> { + extends ProcessWindowFunction<Integer, String, String, TimeWindow> { @Override public void process(String key, Context ctx, - Iterable<Tuple2<String, Integer>> values, - Collector<Tuple3<String, String, Integer>> out) throws Exception { + Iterable<Integer> values, + Collector<String> out) throws Exception { - for (Tuple2<String, Integer> in : values) { - out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + for (Integer in : values) { + out.collect(in.toString()); } } } @@ -1765,4 +1773,12 @@ public class WindowTranslationTest { return value.f0; } } + + private static class Tuple3KeySelector implements KeySelector<Tuple3<String, String, Integer>, String> { + + @Override + public String getKey(Tuple3<String, String, Integer> value) throws Exception { + return value.f0; + } + } }
