This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2d7a908ff23da1811f0bdaeada69ec308d814a2 Author: yunfengzhou-hub <[email protected]> AuthorDate: Tue Sep 24 15:48:25 2024 +0800 [FLINK-36355][runtime] Remove deprecated WindowedStream#apply --- .../api/datastream/AllWindowedStream.java | 145 --------------------- .../streaming/api/datastream/WindowedStream.java | 51 -------- .../windowing/AllWindowTranslationTest.java | 2 +- .../operators/windowing/WindowTranslationTest.java | 4 +- 4 files changed, 3 insertions(+), 199 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 0a21a4451db..c1bb29928d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -1116,151 +1116,6 @@ public class AllWindowedStream<T, W extends Window> { return input.transform(opName, resultType, operator).forceNonParallel(); } - /** - * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is - * interpreted as a regular non-windowed stream. - * - * <p>Arriving data is incrementally aggregated using the given reducer. - * - * @param reduceFunction The reduce function that is used for incremental aggregation. - * @param function The window function. - * @return The data stream that is the result of applying the window function to the window. - * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction)} instead. - */ - @Deprecated - public <R> SingleOutputStreamOperator<R> apply( - ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) { - TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType); - - return apply(reduceFunction, function, resultType); - } - - /** - * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is - * interpreted as a regular non-windowed stream. - * - * <p>Arriving data is incrementally aggregated using the given reducer. - * - * @param reduceFunction The reduce function that is used for incremental aggregation. - * @param function The window function. - * @param resultType Type information for the result type of the window function - * @return The data stream that is the result of applying the window function to the window. - * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction, TypeInformation)} instead. - */ - @Deprecated - public <R> SingleOutputStreamOperator<R> apply( - ReduceFunction<T> reduceFunction, - AllWindowFunction<T, R, W> function, - TypeInformation<R> resultType) { - if (reduceFunction instanceof RichFunction) { - throw new UnsupportedOperationException( - "ReduceFunction of apply can not be a RichFunction."); - } - - // clean the closures - function = input.getExecutionEnvironment().clean(function); - reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); - - String callLocation = Utils.getCallLocationName(); - String udfName = "AllWindowedStream." + callLocation; - - String opName; - KeySelector<T, Byte> keySel = input.getKeySelector(); - - OneInputStreamOperatorFactory<T, R> operator; - - if (evictor != null) { - @SuppressWarnings({"unchecked", "rawtypes"}) - TypeSerializer<StreamRecord<T>> streamRecordSerializer = - (TypeSerializer<StreamRecord<T>>) - new StreamElementSerializer( - input.getType() - .createSerializer( - getExecutionEnvironment() - .getConfig() - .getSerializerConfig())); - - ListStateDescriptor<StreamRecord<T>> stateDesc = - new ListStateDescriptor<>("window-contents", streamRecordSerializer); - - opName = - "TriggerWindow(" - + windowAssigner - + ", " - + stateDesc - + ", " - + trigger - + ", " - + evictor - + ", " - + udfName - + ")"; - - operator = - new EvictingWindowOperatorFactory<>( - windowAssigner, - windowAssigner.getWindowSerializer( - getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType() - .createSerializer( - getExecutionEnvironment() - .getConfig() - .getSerializerConfig()), - stateDesc, - new InternalIterableAllWindowFunction<>( - new ReduceApplyAllWindowFunction<>(reduceFunction, function)), - trigger, - evictor, - allowedLateness, - lateDataOutputTag); - - } else { - ReducingStateDescriptor<T> stateDesc = - new ReducingStateDescriptor<>( - "window-contents", - reduceFunction, - input.getType() - .createSerializer( - getExecutionEnvironment() - .getConfig() - .getSerializerConfig())); - - opName = - "TriggerWindow(" - + windowAssigner - + ", " - + stateDesc - + ", " - + trigger - + ", " - + udfName - + ")"; - - operator = - new WindowOperatorFactory<>( - windowAssigner, - windowAssigner.getWindowSerializer( - getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType() - .createSerializer( - getExecutionEnvironment() - .getConfig() - .getSerializerConfig()), - stateDesc, - new InternalSingleValueAllWindowFunction<>(function), - trigger, - allowedLateness, - lateDataOutputTag); - } - - return input.transform(opName, resultType, operator).forceNonParallel(); - } - // ------------------------------------------------------------------------ // Aggregations on the all windows // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index c35451b128a..eca4c046e5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -617,57 +617,6 @@ public class WindowedStream<T, K, W extends Window> { return input.transform(opName, resultType, operator).setDescription(opDesc); } - /** - * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is - * interpreted as a regular non-windowed stream. - * - * <p>Arriving data is incrementally aggregated using the given reducer. - * - * @param reduceFunction The reduce function that is used for incremental aggregation. - * @param function The window function. - * @return The data stream that is the result of applying the window function to the window. - * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction)} instead. - */ - @Deprecated - public <R> SingleOutputStreamOperator<R> apply( - ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { - TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType); - - return apply(reduceFunction, function, resultType); - } - - /** - * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is - * interpreted as a regular non-windowed stream. - * - * <p>Arriving data is incrementally aggregated using the given reducer. - * - * @param reduceFunction The reduce function that is used for incremental aggregation. - * @param function The window function. - * @param resultType Type information for the result type of the window function - * @return The data stream that is the result of applying the window function to the window. - * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction, TypeInformation)} instead. - */ - @Deprecated - public <R> SingleOutputStreamOperator<R> apply( - ReduceFunction<T> reduceFunction, - WindowFunction<T, R, K, W> function, - TypeInformation<R> resultType) { - // clean the closures - function = input.getExecutionEnvironment().clean(function); - reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); - - final String opName = builder.generateOperatorName(); - final String opDesc = builder.generateOperatorDescription(reduceFunction, function); - - OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function); - - return input.transform(opName, resultType, operator).setDescription(opDesc); - } - // ------------------------------------------------------------------------ // Pre-defined aggregations on the keyed windows // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 653822d5627..8706488228b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -561,7 +561,7 @@ class AllWindowTranslationTest { DataStream<Tuple3<String, String, Integer>> window = source.windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) - .apply( + .reduce( reducer, new AllWindowFunction< Tuple2<String, Integer>, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 71d53a795ba..28cc3b570af 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -535,7 +535,7 @@ class WindowTranslationTest { DataStream<Tuple3<String, String, Integer>> window = source.keyBy(new TupleKeySelector()) .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) - .apply( + .reduce( reducer, new WindowFunction< Tuple2<String, Integer>, @@ -591,7 +591,7 @@ class WindowTranslationTest { source.keyBy(new TupleKeySelector()) .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) .evictor(CountEvictor.of(100)) - .apply( + .reduce( reducer, new WindowFunction< Tuple2<String, Integer>,
