Repository: flink Updated Branches: refs/heads/release-1.3 f72eff7fc -> b2d6dc1d4
http://git-wip-us.apache.org/repos/asf/flink/blob/b2d6dc1d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 7913e95..9737d2f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -295,8 +295,7 @@ public class WindowedStream<T, K, W extends Window> { LegacyWindowOperatorType legacyWindowOpType) { TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, inType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType); return reduce(reduceFunction, function, resultType, legacyWindowOpType); } @@ -396,8 +395,7 @@ public class WindowedStream<T, K, W extends Window> { */ @PublicEvolving public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) { - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessWindowFunction.class, true, true, input.getType(), null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, input.getType(), null); return reduce(reduceFunction, function, resultType); } @@ -544,8 +542,7 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, foldAccumulatorType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, foldAccumulatorType); return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); } @@ -663,8 +660,7 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false); + TypeInformation<R> windowResultType = getProcessWindowFunctionReturnType(windowFunction, foldResultType, Utils.getCallLocationName()); return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType); } @@ -852,8 +848,7 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, WindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } @@ -981,12 +976,42 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType( + WindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + WindowFunction.class, + 0, + 1, + new int[]{2, 0}, + new int[]{3, 0}, + inType, + null, + false); + } + + private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType( + ProcessWindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType, + String functionName) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessWindowFunction.class, + 0, + 1, + new int[]{2, 0}, + new int[]{3, 0}, + inType, + functionName, + false); + } + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is @@ -1094,8 +1119,7 @@ public class WindowedStream<T, K, W extends Window> { * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) { - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, getInputType(), null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, getInputType()); return apply(function, resultType); } @@ -1131,8 +1155,7 @@ public class WindowedStream<T, K, W extends Window> { */ @PublicEvolving public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) { - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessWindowFunction.class, true, true, getInputType(), null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, getInputType(), null); return process(function, resultType); } @@ -1231,8 +1254,7 @@ public class WindowedStream<T, K, W extends Window> { @Deprecated public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, inType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType); return apply(reduceFunction, function, resultType); } http://git-wip-us.apache.org/repos/asf/flink/blob/b2d6dc1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 5071c37..843839d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -259,23 +259,28 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Integer> source = env.fromElements(1, 2); - DataStream<Tuple3<String, String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<String> window1 = source + .keyBy(new KeySelector<Integer, String>() { + @Override + public String getKey(Integer value) throws Exception { + return value.toString(); + } + }) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .evictor(CountEvictor.of(5)) .process(new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputTransformation<Integer, String> transform = (OneInputTransformation<Integer, String>) window1.getTransformation(); + final OneInputStreamOperator<Integer, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Integer, ?, ?, ?> winOperator = (WindowOperator<String, Integer, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); - processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, 1); } // ------------------------------------------------------------------------ @@ -603,28 +608,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<Integer> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -632,28 +639,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<Integer> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -661,30 +670,32 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); DummyReducer reducer = new DummyReducer(); - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -692,28 +703,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -721,30 +734,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DummyReducer reducer = new DummyReducer(); - - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -752,28 +765,30 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple3<String, String, Integer>> window = source - .keyBy(new TupleKeySelector()) + DataStream<String> window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } // ------------------------------------------------------------------------ @@ -1405,29 +1420,31 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<Integer> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .aggregate(new DummyAggregationFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -1435,42 +1452,33 @@ public class WindowTranslationTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream<Tuple3<String, String, Integer>> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(new TupleKeySelector()) + DataStream<String> window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .aggregate( new DummyAggregationFunction(), - new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { - @Override - public void process( - String s, - Context context, - Iterable<Tuple2<String, Integer>> elements, - Collector<Tuple2<String, Integer>> out) throws Exception { - for (Tuple2<String, Integer> in : elements) { - out.collect(in); - } - } - }); + new TestProcessWindowFunction()); - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + final OneInputTransformation<Tuple3<String, String, Integer>, String> transform = + (OneInputTransformation<Tuple3<String, String, Integer>, String>) window1.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); + final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = - (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator; + WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator = + (WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @@ -1683,7 +1691,7 @@ public class WindowTranslationTest { } private static class DummyAggregationFunction - implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> { + implements AggregateFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, Integer> { @Override public Tuple2<String, Integer> createAccumulator() { @@ -1691,14 +1699,14 @@ public class WindowTranslationTest { } @Override - public void add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) { + public void add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) { accumulator.f0 = value.f0; - accumulator.f1 = value.f1; + accumulator.f1 = value.f2; } @Override - public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) { - return accumulator; + public Integer getResult(Tuple2<String, Integer> accumulator) { + return accumulator.f1; } @Override @@ -1729,31 +1737,31 @@ public class WindowTranslationTest { } private static class TestWindowFunction - implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> { + implements WindowFunction<Integer, String, String, TimeWindow> { @Override public void apply(String key, TimeWindow window, - Iterable<Tuple2<String, Integer>> values, - Collector<Tuple3<String, String, Integer>> out) throws Exception { + Iterable<Integer> values, + Collector<String> out) throws Exception { - for (Tuple2<String, Integer> in : values) { - out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + for (Integer in : values) { + out.collect(in.toString()); } } } private static class TestProcessWindowFunction - extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> { + extends ProcessWindowFunction<Integer, String, String, TimeWindow> { @Override public void process(String key, Context ctx, - Iterable<Tuple2<String, Integer>> values, - Collector<Tuple3<String, String, Integer>> out) throws Exception { + Iterable<Integer> values, + Collector<String> out) throws Exception { - for (Tuple2<String, Integer> in : values) { - out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + for (Integer in : values) { + out.collect(in.toString()); } } } @@ -1766,4 +1774,12 @@ public class WindowTranslationTest { return value.f0; } } + + private static class Tuple3KeySelector implements KeySelector<Tuple3<String, String, Integer>, String> { + + @Override + public String getKey(Tuple3<String, String, Integer> value) throws Exception { + return value.f0; + } + } }
