Repository: flink Updated Branches: refs/heads/master 1f9c0cf85 -> 698e53e47
[FLINK-3869] Replace WindowedStream.apply() by reduce()/fold() Before, there where these overloads for apply(): - appply(ReduceFunction, WindowFunction) - apply(T initial, FoldFunction, WindowFunction) These are now called reduce() and fold(). We keep the old methods and deprecate them for compatibility. This also fixes a problem with apply(T initial, FoldFunction, WindowFunction) being to restrictive. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/698e53e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/698e53e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/698e53e4 Branch: refs/heads/master Commit: 698e53e473fa0f78ed1e23716a50adc914563754 Parents: 1f9c0cf Author: Yassine Marzougui <[email protected]> Authored: Sun Nov 20 00:19:10 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Nov 22 17:41:07 2016 +0100 ---------------------------------------------------------------------- docs/dev/windows.md | 12 +- .../api/datastream/AllWindowedStream.java | 226 ++++++++++++++++++- .../api/datastream/WindowedStream.java | 219 +++++++++++++++++- .../windowing/FoldApplyAllWindowFunction.java | 25 +- .../windowing/FoldApplyWindowFunction.java | 22 +- .../operators/FoldApplyWindowFunctionTest.java | 5 +- .../streaming/api/scala/AllWindowedStream.scala | 135 ++++++++++- .../streaming/api/scala/WindowedStream.scala | 127 +++++++++++ .../OnWindowedStream.scala | 13 +- .../api/scala/AllWindowTranslationTest.scala | 4 +- .../streaming/api/scala/WindowFoldITCase.scala | 4 +- .../api/scala/WindowReduceITCase.scala | 4 +- .../api/scala/WindowTranslationTest.scala | 4 +- .../EventTimeAllWindowCheckpointingITCase.java | 6 +- .../EventTimeWindowCheckpointingITCase.java | 4 +- 15 files changed, 748 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/docs/dev/windows.md ---------------------------------------------------------------------- diff --git a/docs/dev/windows.md b/docs/dev/windows.md index 4bce07b..d6189d4 100644 --- a/docs/dev/windows.md +++ b/docs/dev/windows.md @@ -498,10 +498,6 @@ The following example shows how an incremental `FoldFunction` can be combined wi a `WindowFunction` to extract the number of events in the window and return also the key and end time of the window. -Please note that the use of a `FoldFunction` in combination with `WindowFunction` is -restricted in that the types of the `Iterable` and `Collector` arguments in -`WindowFunction` must both correspond to the type of the accumulator in the `FoldFunction`. - <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} @@ -510,7 +506,7 @@ DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .timeWindow(<window assigner>) - .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction()) + .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction()) // Function definitions @@ -546,7 +542,7 @@ val input: DataStream[SensorReading] = ... input .keyBy(<key selector>) .timeWindow(<window assigner>) - .apply ( + .fold ( ("", 0L, 0), (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) }, ( key: String, @@ -577,7 +573,7 @@ DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .timeWindow(<window assigner>) - .apply(new MyReduceFunction(), new MyWindowFunction()); + .reduce(new MyReduceFunction(), new MyWindowFunction()); // Function definitions @@ -610,7 +606,7 @@ val input: DataStream[SensorReading] = ... input .keyBy(<key selector>) .timeWindow(<window assigner>) - .apply( + .reduce( (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, ( key: String, window: TimeWindow, http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index e77b5c8..ae71ce5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -175,7 +175,7 @@ public class AllWindowedStream<T, W extends Window> { public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " + - "Please use apply(ReduceFunction, WindowFunction) instead."); + "Please use reduce(ReduceFunction, WindowFunction) instead."); } //clean the closure @@ -184,7 +184,100 @@ public class AllWindowedStream<T, W extends Window> { String callLocation = Utils.getCallLocationName(); String udfName = "WindowedStream." + callLocation; - return apply(function, new PassThroughAllWindowFunction<W, T>()); + return reduce(function, new PassThroughAllWindowFunction<W, T>()); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + + public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) { + TypeInformation<T> inType = input.getType(); + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, AllWindowFunction.class, true, true, inType, null, false); + + return reduce(reduceFunction, function, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The window function. + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + String opName; + KeySelector<T, Byte> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = + new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)), + trigger, + evictor, + allowedLateness); + + } else { + ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", + reduceFunction, + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = + new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueAllWindowFunction<>(function), + trigger, + allowedLateness); + } + + return input.transform(opName, resultType, operator).forceNonParallel(); } /** @@ -197,8 +290,8 @@ public class AllWindowedStream<T, W extends Window> { */ public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) { if (function instanceof RichFunction) { - throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + - "Please use apply(FoldFunction, WindowFunction) instead."); + throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " + + "Please use fold(FoldFunction, WindowFunction) instead."); } TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(), @@ -217,11 +310,115 @@ public class AllWindowedStream<T, W extends Window> { */ public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { if (function instanceof RichFunction) { - throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + - "Please use apply(FoldFunction, WindowFunction) instead."); + throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " + + "Please use fold(FoldFunction, WindowFunction) instead."); + } + + return fold(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) { + + TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + Utils.getCallLocationName(), true); + + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, AllWindowFunction.class, true, true, foldAccumulatorType, null, false); + + return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @param foldAccumulatorType Type information for the result type of the fold function + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, + FoldFunction<T, ACC> foldFunction, + AllWindowFunction<ACC, R, W> function, + TypeInformation<ACC> foldAccumulatorType, + TypeInformation<R> resultType) { + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction."); + } + if (windowAssigner instanceof MergingWindowAssigner) { + throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + foldFunction = input.getExecutionEnvironment().clean(foldFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + String opName; + KeySelector<T, Byte> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = + new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)), + trigger, + evictor, + allowedLateness); + + } else { + FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents", + initialValue, foldFunction, foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = + new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueAllWindowFunction<>(function), + trigger, + allowedLateness); } - return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType); + return input.transform(opName, resultType, operator).forceNonParallel(); } /** @@ -321,8 +518,10 @@ public class AllWindowedStream<T, W extends Window> { * @param reduceFunction The reduce function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction)} instead. */ - + @Deprecated public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) { TypeInformation<T> inType = input.getType(); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( @@ -343,7 +542,10 @@ public class AllWindowedStream<T, W extends Window> { * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction, TypeInformation)} instead. */ + @Deprecated public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); @@ -415,7 +617,10 @@ public class AllWindowedStream<T, W extends Window> { * @param foldFunction The fold function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction)} instead. */ + @Deprecated public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) { TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), @@ -437,7 +642,10 @@ public class AllWindowedStream<T, W extends Window> { * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction, TypeInformation, TypeInformation)} instead. */ + @Deprecated public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) { if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); @@ -474,7 +682,7 @@ public class AllWindowedStream<T, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)), + new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, resultType)), trigger, evictor, allowedLateness); http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 15ec5f1..ad7f371 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -185,7 +185,7 @@ public class WindowedStream<T, K, W extends Window> { public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " + - "Please use apply(ReduceFunction, WindowFunction) instead."); + "Please use reduce(ReduceFunction, WindowFunction) instead."); } //clean the closure @@ -199,7 +199,99 @@ public class WindowedStream<T, K, W extends Window> { return result; } - return apply(function, new PassThroughWindowFunction<K, W, T>()); + return reduce(function, new PassThroughWindowFunction<K, W, T>()); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { + TypeInformation<T> inType = input.getType(); + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, WindowFunction.class, true, true, inType, null, false); + + return reduce(reduceFunction, function, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given reducer. + * + * @param reduceFunction The reduce function that is used for incremental aggregation. + * @param function The window function. + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + String opName; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = + new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)), + trigger, + evictor, + allowedLateness); + + } else { + ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", + reduceFunction, + input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = + new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(function), + trigger, + allowedLateness); + } + + return input.transform(opName, resultType, operator); } /** @@ -213,7 +305,7 @@ public class WindowedStream<T, K, W extends Window> { public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + - "Please use apply(FoldFunction, WindowFunction) instead."); + "Please use fold(FoldFunction, WindowFunction) instead."); } TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(), @@ -233,10 +325,112 @@ public class WindowedStream<T, K, W extends Window> { public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + - "Please use apply(FoldFunction, WindowFunction) instead."); + "Please use fold(FoldFunction, WindowFunction) instead."); + } + + return fold(initialValue, function, new PassThroughWindowFunction<K, W, R>(), resultType, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function) { + + TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + Utils.getCallLocationName(), true); + + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, WindowFunction.class, true, true, getInputType(), null, false); + + return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @param foldAccumulatorType Type information for the result type of the fold function + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, + FoldFunction<T, ACC> foldFunction, + WindowFunction<ACC, R, K, W> function, + TypeInformation<ACC> foldAccumulatorType, + TypeInformation<R> resultType) { + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction."); + } + if (windowAssigner instanceof MergingWindowAssigner) { + throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + foldFunction = input.getExecutionEnvironment().clean(foldFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + String opName; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<T>> streamRecordSerializer = + (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor<StreamRecord<T>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)), + trigger, + evictor, + allowedLateness); + + } else { + FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents", + initialValue, foldFunction, foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig())); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(function), + trigger, + allowedLateness); } - return apply(initialValue, function, new PassThroughWindowFunction<K, W, R>(), resultType); + return input.transform(opName, resultType, operator); } /** @@ -342,8 +536,10 @@ public class WindowedStream<T, K, W extends Window> { * @param reduceFunction The reduce function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction)} instead. */ - + @Deprecated public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { TypeInformation<T> inType = input.getType(); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( @@ -364,7 +560,10 @@ public class WindowedStream<T, K, W extends Window> { * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction, TypeInformation)} instead. */ + @Deprecated public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); @@ -436,7 +635,10 @@ public class WindowedStream<T, K, W extends Window> { * @param foldFunction The fold function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction)} instead. */ + @Deprecated public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function) { TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), @@ -458,7 +660,10 @@ public class WindowedStream<T, K, W extends Window> { * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. + * + * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction, TypeInformation, TypeInformation)} instead. */ + @Deprecated public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) { if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction."); @@ -494,7 +699,7 @@ public class WindowedStream<T, K, W extends Window> { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)), + new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, resultType)), trigger, evictor, allowedLateness); http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java index a5bc0a1..0efffb9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java @@ -36,20 +36,25 @@ import java.io.IOException; import java.util.Collections; @Internal -public class FoldApplyAllWindowFunction<W extends Window, T, ACC> - extends WrappingFunction<AllWindowFunction<ACC, ACC, W>> - implements AllWindowFunction<T, ACC, W>, OutputTypeConfigurable<ACC> { +public class FoldApplyAllWindowFunction<W extends Window, T, ACC, R> + extends WrappingFunction<AllWindowFunction<ACC, R, W>> + implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> { private static final long serialVersionUID = 1L; private final FoldFunction<T, ACC> foldFunction; private byte[] serializedInitialValue; + private transient TypeInformation<ACC> accTypeInformation; private TypeSerializer<ACC> accSerializer; private transient ACC initialValue; - public FoldApplyAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, ACC, W> windowFunction) { + public FoldApplyAllWindowFunction(ACC initialValue, + FoldFunction<T, ACC> foldFunction, + AllWindowFunction<ACC, R, W> windowFunction, + TypeInformation<ACC> accTypeInformation) { super(windowFunction); + this.accTypeInformation = accTypeInformation; this.foldFunction = foldFunction; this.initialValue = initialValue; } @@ -58,6 +63,11 @@ public class FoldApplyAllWindowFunction<W extends Window, T, ACC> public void open(Configuration configuration) throws Exception { super.open(configuration); + if (accSerializer == null) { + throw new RuntimeException("No serializer set for the fold accumulator type. " + + "Probably the setOutputType method was not called."); + } + if (serializedInitialValue == null) { throw new RuntimeException("No initial value was serialized for the fold " + "window function. Probably the setOutputType method was not called."); @@ -69,7 +79,7 @@ public class FoldApplyAllWindowFunction<W extends Window, T, ACC> } @Override - public void apply(W window, Iterable<T> values, Collector<ACC> out) throws Exception { + public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception { ACC result = accSerializer.copy(initialValue); for (T val: values) { @@ -80,8 +90,9 @@ public class FoldApplyAllWindowFunction<W extends Window, T, ACC> } @Override - public void setOutputType(TypeInformation<ACC> outTypeInfo, ExecutionConfig executionConfig) { - accSerializer = outTypeInfo.createSerializer(executionConfig); + public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { + // out type is not used, just use this for the execution config + accSerializer = accTypeInformation.createSerializer(executionConfig); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java index 756a683..9e916f1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java @@ -36,20 +36,22 @@ import java.io.IOException; import java.util.Collections; @Internal -public class FoldApplyWindowFunction<K, W extends Window, T, ACC> - extends WrappingFunction<WindowFunction<ACC, ACC, K, W>> - implements WindowFunction<T, ACC, K, W>, OutputTypeConfigurable<ACC> { +public class FoldApplyWindowFunction<K, W extends Window, T, ACC, R> + extends WrappingFunction<WindowFunction<ACC, R, K, W>> + implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> { private static final long serialVersionUID = 1L; private final FoldFunction<T, ACC> foldFunction; private byte[] serializedInitialValue; + private transient TypeInformation<ACC> accTypeInformation; private TypeSerializer<ACC> accSerializer; private transient ACC initialValue; - public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, ACC, K, W> windowFunction) { + public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) { super(windowFunction); + this.accTypeInformation = accTypeInformation; this.foldFunction = foldFunction; this.initialValue = initialValue; } @@ -58,6 +60,11 @@ public class FoldApplyWindowFunction<K, W extends Window, T, ACC> public void open(Configuration configuration) throws Exception { super.open(configuration); + if (accSerializer == null) { + throw new RuntimeException("No serializer set for the fold accumulator type. " + + "Probably the setOutputType method was not called."); + } + if (serializedInitialValue == null) { throw new RuntimeException("No initial value was serialized for the fold " + "window function. Probably the setOutputType method was not called."); @@ -69,7 +76,7 @@ public class FoldApplyWindowFunction<K, W extends Window, T, ACC> } @Override - public void apply(K key, W window, Iterable<T> values, Collector<ACC> out) throws Exception { + public void apply(K key, W window, Iterable<T> values, Collector<R> out) throws Exception { ACC result = accSerializer.copy(initialValue); for (T val: values) { @@ -80,8 +87,9 @@ public class FoldApplyWindowFunction<K, W extends Window, T, ACC> } @Override - public void setOutputType(TypeInformation<ACC> outTypeInfo, ExecutionConfig executionConfig) { - accSerializer = outTypeInfo.createSerializer(executionConfig); + public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { + // out type is not used, just use this for the execution config + accSerializer = accTypeInformation.createSerializer(executionConfig); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index 0b0ab9e..91ec427 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -56,7 +56,7 @@ public class FoldApplyWindowFunctionTest { int initValue = 1; - FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer> foldWindowFunction = new FoldApplyWindowFunction<>( + FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyWindowFunction<>( initValue, new FoldFunction<Integer, Integer>() { private static final long serialVersionUID = -4849549768529720587L; @@ -76,7 +76,8 @@ public class FoldApplyWindowFunctionTest { out.collect(in); } } - } + }, + BasicTypeInfo.INT_TYPE_INFO ); AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 47c13c9..83104e8 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -140,6 +140,62 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { } /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def reduce[R: TypeInformation]( + preAggregator: ReduceFunction[T], + windowFunction: AllWindowFunction[T, R, W]): DataStream[R] = { + + val cleanedReducer = clean(preAggregator) + val cleanedWindowFunction = clean(windowFunction) + + val applyFunction = new ScalaAllWindowFunctionWrapper[T, R, W](cleanedWindowFunction) + + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.reduce(cleanedReducer, applyFunction, returnType)) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def reduce[R: TypeInformation]( + preAggregator: (T, T) => T, + windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + + if (preAggregator == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (windowFunction == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanReducer = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val reducer = new ScalaReduceFunction[T](cleanReducer) + val applyFunction = new ScalaAllWindowFunction[T, R, W](cleanWindowFunction) + + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.reduce(reducer, applyFunction, returnType)) + } + + /** * Applies the given fold function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the reduce function is * interpreted as a regular non-windowed stream. @@ -178,6 +234,71 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { } /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation folder. + * + * @param initialValue Initial value of the fold + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def fold[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + preAggregator: FoldFunction[T, ACC], + windowFunction: AllWindowFunction[ACC, R, W]): DataStream[R] = { + + val cleanFolder = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val applyFunction = new ScalaAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction) + + asScalaStream(javaStream.fold( + initialValue, + cleanFolder, + applyFunction, + implicitly[TypeInformation[ACC]], + implicitly[TypeInformation[R]])) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation folder. + * + * @param initialValue Initial value of the fold + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def fold[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + preAggregator: (ACC, T) => ACC, + windowFunction: (W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] = { + + if (preAggregator == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (windowFunction == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanFolder = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val folder = new ScalaFoldFunction[T, ACC](cleanFolder) + val applyFunction = new ScalaAllWindowFunction[ACC, R, W](cleanWindowFunction) + + val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, returnType)) + } + + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. @@ -227,7 +348,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param preAggregator The reduce function that is used for pre-aggregation * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[reduce(ReduceFunction, AllWindowFunction)]] instead. */ + @deprecated def apply[R: TypeInformation]( preAggregator: ReduceFunction[T], windowFunction: AllWindowFunction[T, R, W]): DataStream[R] = { @@ -251,7 +374,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param preAggregator The reduce function that is used for pre-aggregation * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[reduce(ReduceFunction, AllWindowFunction)]] instead. */ + @deprecated def apply[R: TypeInformation]( preAggregator: (T, T) => T, windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { @@ -284,7 +409,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param preAggregator The reduce function that is used for pre-aggregation * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[fold(R, FoldFunction, AllWindowFunction)]] instead. */ + @deprecated def apply[R: TypeInformation]( initialValue: R, preAggregator: FoldFunction[T, R], @@ -313,12 +440,14 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param preAggregator The reduce function that is used for pre-aggregation * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[fold(R, FoldFunction, AllWindowFunction]] instead. */ + @deprecated def apply[R: TypeInformation]( initialValue: R, preAggregator: (R, T) => R, windowFunction: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { - + if (preAggregator == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -328,10 +457,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { val cleanFolder = clean(preAggregator) val cleanWindowFunction = clean(windowFunction) - + val folder = new ScalaFoldFunction[T, R](cleanFolder) val applyFunction = new ScalaAllWindowFunction[R, R, W](cleanWindowFunction) - + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType)) } http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 6a10ff6..76d9cda 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -142,6 +142,61 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def reduce[R: TypeInformation]( + preAggregator: ReduceFunction[T], + function: WindowFunction[T, R, K, W]): DataStream[R] = { + + val cleanedPreAggregator = clean(preAggregator) + val cleanedWindowFunction = clean(function) + + val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction) + + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.reduce(cleanedPreAggregator, applyFunction, resultType)) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def reduce[R: TypeInformation]( + preAggregator: (T, T) => T, + windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + + if (preAggregator == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (windowFunction == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanReducer = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val reducer = new ScalaReduceFunction[T](cleanReducer) + val applyFunction = new ScalaWindowFunction[T, R, K, W](cleanWindowFunction) + + asScalaStream(javaStream.reduce(reducer, applyFunction, implicitly[TypeInformation[R]])) + } + + /** * Applies the given fold function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the reduce function is * interpreted as a regular non-windowed stream. @@ -179,6 +234,70 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold + * @param foldFunction The fold function that is used for incremental aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def fold[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + foldFunction: FoldFunction[T, ACC], + function: WindowFunction[ACC, R, K, W]): DataStream[R] = { + + val cleanedFunction = clean(function) + val cleanedFoldFunction = clean(foldFunction) + + val applyFunction = new ScalaWindowFunctionWrapper[ACC, R, K, W](cleanedFunction) + + asScalaStream(javaStream.fold( + initialValue, + cleanedFoldFunction, + applyFunction, + implicitly[TypeInformation[ACC]], + implicitly[TypeInformation[R]])) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is incrementally aggregated using the given fold function. + * + * @param foldFunction The fold function that is used for incremental aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def fold[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + foldFunction: (ACC, T) => ACC, + windowFunction: (K, W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] = { + + if (foldFunction == null) { + throw new NullPointerException("Fold function must not be null.") + } + if (windowFunction == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanFolder = clean(foldFunction) + val cleanWindowFunction = clean(windowFunction) + + val folder = new ScalaFoldFunction[T, ACC](cleanFolder) + val applyFunction = new ScalaWindowFunction[ACC, R, K, W](cleanWindowFunction) + + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] + val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] + asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, resultType)) + } + + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. @@ -230,7 +349,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param preAggregator The reduce function that is used for pre-aggregation * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[reduce(ReduceFunction, WindowFunction)]] instead. */ + @deprecated def apply[R: TypeInformation]( preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R] = { @@ -254,7 +375,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param preAggregator The reduce function that is used for pre-aggregation * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[reduce(ReduceFunction, WindowFunction)]] instead. */ + @deprecated def apply[R: TypeInformation]( preAggregator: (T, T) => T, windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { @@ -286,7 +409,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param foldFunction The fold function that is used for incremental aggregation * @param function The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[fold(R, FoldFunction, WindowFunction)]] instead. */ + @deprecated def apply[R: TypeInformation]( initialValue: R, foldFunction: FoldFunction[T, R], @@ -314,7 +439,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param foldFunction The fold function that is used for incremental aggregation * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. + * @deprecated Use [[fold(R, FoldFunction, WindowFunction)]] instead. */ + @deprecated def apply[R: TypeInformation]( initialValue: R, foldFunction: (R, T) => R, http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala index f7a5923..27cf3db 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream} import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector /** * Wraps a joined data stream, allowing to use anonymous partial functions to @@ -77,13 +78,13 @@ class OnWindowedStream[T, K, W <: Window](stream: WindowedStream[T, K, W]) { * @return The data stream that is the result of applying the window function to the window. */ @PublicEvolving - def applyWith[R: TypeInformation]( - initialValue: R)( - foldFunction: (R, T) => R, - windowFunction: (K, W, Stream[R]) => TraversableOnce[R]) + def applyWith[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC)( + foldFunction: (ACC, T) => ACC, + windowFunction: (K, W, Stream[ACC]) => TraversableOnce[R]) : DataStream[R] = - stream.apply(initialValue, foldFunction, { - (key, window, items, out) => + stream.fold(initialValue, foldFunction, { + (key: K, window: W, items: Iterable[ACC], out: Collector[R]) => windowFunction(key, window, items.toStream).foreach(out.collect) }) http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 6d90239..6273e54 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -206,7 +206,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) - .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, @@ -231,7 +231,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala index 6a6a956..83697ce 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -127,7 +127,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { source1 .keyBy(0) .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) - .apply( + .fold( ("R:", 0), foldFunc, new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]()) @@ -230,7 +230,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { source1 .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) - .apply( + .fold( ("R:", 0), foldFunc, new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]()) http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala index 7c414fa..9666266 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala @@ -128,7 +128,7 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase { source1 .keyBy(0) .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) - .apply( + .reduce( reduceFunc, new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]()) .addSink(new SinkFunction[(String, Int)]() { @@ -230,7 +230,7 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase { source1 .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) - .apply( + .reduce( reduceFunc, new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]()) .addSink(new SinkFunction[(String, Int)]() { http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 60e61f0..c67c215 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -203,7 +203,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) - .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, @@ -228,7 +228,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 4c873a3..5d17608 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -232,7 +232,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply( + .reduce( new ReduceFunction<Tuple2<Long, IntType>>() { @Override @@ -304,7 +304,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new Tuple4<>(0L, 0L, 0L, new IntType(0)), + .fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)), new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() { @Override public Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> accumulator, @@ -377,7 +377,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply( + .reduce( new ReduceFunction<Tuple2<Long, IntType>>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 4f28d8c..50079d1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -362,7 +362,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply( + .reduce( new ReduceFunction<Tuple2<Long, IntType>>() { @Override @@ -435,7 +435,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply( + .reduce( new ReduceFunction<Tuple2<Long, IntType>>() { @Override
