This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2d7a908ff23da1811f0bdaeada69ec308d814a2
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Sep 24 15:48:25 2024 +0800

    [FLINK-36355][runtime] Remove deprecated WindowedStream#apply
---
 .../api/datastream/AllWindowedStream.java          | 145 ---------------------
 .../streaming/api/datastream/WindowedStream.java   |  51 --------
 .../windowing/AllWindowTranslationTest.java        |   2 +-
 .../operators/windowing/WindowTranslationTest.java |   4 +-
 4 files changed, 3 insertions(+), 199 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0a21a4451db..c1bb29928d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -1116,151 +1116,6 @@ public class AllWindowedStream<T, W extends Window> {
         return input.transform(opName, resultType, 
operator).forceNonParallel();
     }
 
-    /**
-     * Applies the given window function to each window. The window function 
is called for each
-     * evaluation of the window for each key individually. The output of the 
window function is
-     * interpreted as a regular non-windowed stream.
-     *
-     * <p>Arriving data is incrementally aggregated using the given reducer.
-     *
-     * @param reduceFunction The reduce function that is used for incremental 
aggregation.
-     * @param function The window function.
-     * @return The data stream that is the result of applying the window 
function to the window.
-     * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction)} 
instead.
-     */
-    @Deprecated
-    public <R> SingleOutputStreamOperator<R> apply(
-            ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> 
function) {
-        TypeInformation<T> inType = input.getType();
-        TypeInformation<R> resultType = 
getAllWindowFunctionReturnType(function, inType);
-
-        return apply(reduceFunction, function, resultType);
-    }
-
-    /**
-     * Applies the given window function to each window. The window function 
is called for each
-     * evaluation of the window for each key individually. The output of the 
window function is
-     * interpreted as a regular non-windowed stream.
-     *
-     * <p>Arriving data is incrementally aggregated using the given reducer.
-     *
-     * @param reduceFunction The reduce function that is used for incremental 
aggregation.
-     * @param function The window function.
-     * @param resultType Type information for the result type of the window 
function
-     * @return The data stream that is the result of applying the window 
function to the window.
-     * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction, 
TypeInformation)} instead.
-     */
-    @Deprecated
-    public <R> SingleOutputStreamOperator<R> apply(
-            ReduceFunction<T> reduceFunction,
-            AllWindowFunction<T, R, W> function,
-            TypeInformation<R> resultType) {
-        if (reduceFunction instanceof RichFunction) {
-            throw new UnsupportedOperationException(
-                    "ReduceFunction of apply can not be a RichFunction.");
-        }
-
-        // clean the closures
-        function = input.getExecutionEnvironment().clean(function);
-        reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
-
-        String callLocation = Utils.getCallLocationName();
-        String udfName = "AllWindowedStream." + callLocation;
-
-        String opName;
-        KeySelector<T, Byte> keySel = input.getKeySelector();
-
-        OneInputStreamOperatorFactory<T, R> operator;
-
-        if (evictor != null) {
-            @SuppressWarnings({"unchecked", "rawtypes"})
-            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
-                    (TypeSerializer<StreamRecord<T>>)
-                            new StreamElementSerializer(
-                                    input.getType()
-                                            .createSerializer(
-                                                    getExecutionEnvironment()
-                                                            .getConfig()
-                                                            
.getSerializerConfig()));
-
-            ListStateDescriptor<StreamRecord<T>> stateDesc =
-                    new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
-
-            opName =
-                    "TriggerWindow("
-                            + windowAssigner
-                            + ", "
-                            + stateDesc
-                            + ", "
-                            + trigger
-                            + ", "
-                            + evictor
-                            + ", "
-                            + udfName
-                            + ")";
-
-            operator =
-                    new EvictingWindowOperatorFactory<>(
-                            windowAssigner,
-                            windowAssigner.getWindowSerializer(
-                                    getExecutionEnvironment().getConfig()),
-                            keySel,
-                            input.getKeyType()
-                                    .createSerializer(
-                                            getExecutionEnvironment()
-                                                    .getConfig()
-                                                    .getSerializerConfig()),
-                            stateDesc,
-                            new InternalIterableAllWindowFunction<>(
-                                    new 
ReduceApplyAllWindowFunction<>(reduceFunction, function)),
-                            trigger,
-                            evictor,
-                            allowedLateness,
-                            lateDataOutputTag);
-
-        } else {
-            ReducingStateDescriptor<T> stateDesc =
-                    new ReducingStateDescriptor<>(
-                            "window-contents",
-                            reduceFunction,
-                            input.getType()
-                                    .createSerializer(
-                                            getExecutionEnvironment()
-                                                    .getConfig()
-                                                    .getSerializerConfig()));
-
-            opName =
-                    "TriggerWindow("
-                            + windowAssigner
-                            + ", "
-                            + stateDesc
-                            + ", "
-                            + trigger
-                            + ", "
-                            + udfName
-                            + ")";
-
-            operator =
-                    new WindowOperatorFactory<>(
-                            windowAssigner,
-                            windowAssigner.getWindowSerializer(
-                                    getExecutionEnvironment().getConfig()),
-                            keySel,
-                            input.getKeyType()
-                                    .createSerializer(
-                                            getExecutionEnvironment()
-                                                    .getConfig()
-                                                    .getSerializerConfig()),
-                            stateDesc,
-                            new 
InternalSingleValueAllWindowFunction<>(function),
-                            trigger,
-                            allowedLateness,
-                            lateDataOutputTag);
-        }
-
-        return input.transform(opName, resultType, 
operator).forceNonParallel();
-    }
-
     // ------------------------------------------------------------------------
     //  Aggregations on the all windows
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index c35451b128a..eca4c046e5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -617,57 +617,6 @@ public class WindowedStream<T, K, W extends Window> {
         return input.transform(opName, resultType, 
operator).setDescription(opDesc);
     }
 
-    /**
-     * Applies the given window function to each window. The window function 
is called for each
-     * evaluation of the window for each key individually. The output of the 
window function is
-     * interpreted as a regular non-windowed stream.
-     *
-     * <p>Arriving data is incrementally aggregated using the given reducer.
-     *
-     * @param reduceFunction The reduce function that is used for incremental 
aggregation.
-     * @param function The window function.
-     * @return The data stream that is the result of applying the window 
function to the window.
-     * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction)} instead.
-     */
-    @Deprecated
-    public <R> SingleOutputStreamOperator<R> apply(
-            ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> 
function) {
-        TypeInformation<T> inType = input.getType();
-        TypeInformation<R> resultType = getWindowFunctionReturnType(function, 
inType);
-
-        return apply(reduceFunction, function, resultType);
-    }
-
-    /**
-     * Applies the given window function to each window. The window function 
is called for each
-     * evaluation of the window for each key individually. The output of the 
window function is
-     * interpreted as a regular non-windowed stream.
-     *
-     * <p>Arriving data is incrementally aggregated using the given reducer.
-     *
-     * @param reduceFunction The reduce function that is used for incremental 
aggregation.
-     * @param function The window function.
-     * @param resultType Type information for the result type of the window 
function
-     * @return The data stream that is the result of applying the window 
function to the window.
-     * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction, 
TypeInformation)} instead.
-     */
-    @Deprecated
-    public <R> SingleOutputStreamOperator<R> apply(
-            ReduceFunction<T> reduceFunction,
-            WindowFunction<T, R, K, W> function,
-            TypeInformation<R> resultType) {
-        // clean the closures
-        function = input.getExecutionEnvironment().clean(function);
-        reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
-
-        final String opName = builder.generateOperatorName();
-        final String opDesc = 
builder.generateOperatorDescription(reduceFunction, function);
-
-        OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, 
function);
-
-        return input.transform(opName, resultType, 
operator).setDescription(opDesc);
-    }
-
     // ------------------------------------------------------------------------
     //  Pre-defined aggregations on the keyed windows
     // ------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 653822d5627..8706488228b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -561,7 +561,7 @@ class AllWindowTranslationTest {
 
         DataStream<Tuple3<String, String, Integer>> window =
                 
source.windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
-                        .apply(
+                        .reduce(
                                 reducer,
                                 new AllWindowFunction<
                                         Tuple2<String, Integer>,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 71d53a795ba..28cc3b570af 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -535,7 +535,7 @@ class WindowTranslationTest {
         DataStream<Tuple3<String, String, Integer>> window =
                 source.keyBy(new TupleKeySelector())
                         
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
-                        .apply(
+                        .reduce(
                                 reducer,
                                 new WindowFunction<
                                         Tuple2<String, Integer>,
@@ -591,7 +591,7 @@ class WindowTranslationTest {
                 source.keyBy(new TupleKeySelector())
                         
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
                         .evictor(CountEvictor.of(100))
-                        .apply(
+                        .reduce(
                                 reducer,
                                 new WindowFunction<
                                         Tuple2<String, Integer>,

Reply via email to