This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2949815de62 [FLINK-36423][api] Fix RichMapPartitionFunction does not 
invoke open method in KeyedPartitionWindowedStream
2949815de62 is described below

commit 2949815de621efab4d362bb2ee2c30058c8744a2
Author: Xu Huang <130116763+codeno...@users.noreply.github.com>
AuthorDate: Fri Oct 11 10:06:59 2024 +0800

    [FLINK-36423][api] Fix RichMapPartitionFunction does not invoke open method 
in KeyedPartitionWindowedStream
---
 .../datastream/KeyedPartitionWindowedStream.java   | 31 ++++++++++--------
 .../KeyedPartitionWindowedStreamITCase.java        | 38 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
index 6734fbec0db..9509d0e566a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.WrappingFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -64,21 +65,8 @@ public class KeyedPartitionWindowedStream<T, KEY> implements 
PartitionWindowedSt
         TypeInformation<R> resultType =
                 TypeExtractor.getMapPartitionReturnTypes(
                         mapPartitionFunction, input.getType(), opName, true);
-        MapPartitionFunction<T, R> function = mapPartitionFunction;
         return input.window(GlobalWindows.createWithEndOfStreamTrigger())
-                .apply(
-                        new WindowFunction<T, R, KEY, GlobalWindow>() {
-                            @Override
-                            public void apply(
-                                    KEY key,
-                                    GlobalWindow window,
-                                    Iterable<T> input,
-                                    Collector<R> out)
-                                    throws Exception {
-                                function.mapPartition(input, out);
-                            }
-                        },
-                        resultType);
+                .apply(new 
KeyedMapPartitionWindowFunction<>(mapPartitionFunction), resultType);
     }
 
     @Override
@@ -172,4 +160,19 @@ public class KeyedPartitionWindowedStream<T, KEY> 
implements PartitionWindowedSt
                         ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
         return result;
     }
+
+    private static class KeyedMapPartitionWindowFunction<T, R, KEY>
+            extends WrappingFunction<MapPartitionFunction<T, R>>
+            implements WindowFunction<T, R, KEY, GlobalWindow> {
+
+        public KeyedMapPartitionWindowFunction(MapPartitionFunction<T, R> 
function) {
+            super(function);
+        }
+
+        @Override
+        public void apply(KEY key, GlobalWindow window, Iterable<T> input, 
Collector<R> out)
+                throws Exception {
+            wrappedFunction.mapPartition(input, out);
+        }
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
index ae25e6dbb78..1b4b9b4a0b3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
@@ -21,13 +21,16 @@ package org.apache.flink.test.streaming.runtime;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedPartitionWindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
 
@@ -97,6 +100,41 @@ class KeyedPartitionWindowedStreamITCase {
                 createExpectedString(3));
     }
 
+    @Test
+    void testRichMapPartitionFunctionHasOpen() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Tuple2<String, String>> source = 
env.fromData(createSource());
+        source.keyBy(
+                        new KeySelector<Tuple2<String, String>, String>() {
+                            @Override
+                            public String getKey(Tuple2<String, String> value) 
throws Exception {
+                                return value.f0;
+                            }
+                        })
+                .fullWindowPartition()
+                .mapPartition(
+                        new RichMapPartitionFunction<Tuple2<String, String>, 
String>() {
+
+                            private boolean isOpen = false;
+
+                            @Override
+                            public void open(OpenContext openContext) throws 
Exception {
+                                super.open(openContext);
+                                isOpen = true;
+                            }
+
+                            @Override
+                            public void mapPartition(
+                                    Iterable<Tuple2<String, String>> values, 
Collector<String> out)
+                                    throws Exception {
+                                assertThat(isOpen).isTrue();
+                            }
+                        })
+                .sinkTo(new DiscardingSink<>());
+
+        env.execute();
+    }
+
     @Test
     void testReduce() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Reply via email to