This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 2949815de62 [FLINK-36423][api] Fix RichMapPartitionFunction does not invoke open method in KeyedPartitionWindowedStream 2949815de62 is described below commit 2949815de621efab4d362bb2ee2c30058c8744a2 Author: Xu Huang <130116763+codeno...@users.noreply.github.com> AuthorDate: Fri Oct 11 10:06:59 2024 +0800 [FLINK-36423][api] Fix RichMapPartitionFunction does not invoke open method in KeyedPartitionWindowedStream --- .../datastream/KeyedPartitionWindowedStream.java | 31 ++++++++++-------- .../KeyedPartitionWindowedStreamITCase.java | 38 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java index 6734fbec0db..9509d0e566a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.WrappingFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; @@ -64,21 +65,8 @@ public class KeyedPartitionWindowedStream<T, KEY> implements PartitionWindowedSt TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes( mapPartitionFunction, input.getType(), opName, true); - MapPartitionFunction<T, R> function = mapPartitionFunction; return input.window(GlobalWindows.createWithEndOfStreamTrigger()) - .apply( - new WindowFunction<T, R, KEY, GlobalWindow>() { - @Override - public void apply( - KEY key, - GlobalWindow window, - Iterable<T> input, - Collector<R> out) - throws Exception { - function.mapPartition(input, out); - } - }, - resultType); + .apply(new KeyedMapPartitionWindowFunction<>(mapPartitionFunction), resultType); } @Override @@ -172,4 +160,19 @@ public class KeyedPartitionWindowedStream<T, KEY> implements PartitionWindowedSt ManagedMemoryUseCase.OPERATOR, managedMemoryWeight); return result; } + + private static class KeyedMapPartitionWindowFunction<T, R, KEY> + extends WrappingFunction<MapPartitionFunction<T, R>> + implements WindowFunction<T, R, KEY, GlobalWindow> { + + public KeyedMapPartitionWindowFunction(MapPartitionFunction<T, R> function) { + super(function); + } + + @Override + public void apply(KEY key, GlobalWindow window, Iterable<T> input, Collector<R> out) + throws Exception { + wrappedFunction.mapPartition(input, out); + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java index ae25e6dbb78..1b4b9b4a0b3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java @@ -21,13 +21,16 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedPartitionWindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; @@ -97,6 +100,41 @@ class KeyedPartitionWindowedStreamITCase { createExpectedString(3)); } + @Test + void testRichMapPartitionFunctionHasOpen() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Tuple2<String, String>> source = env.fromData(createSource()); + source.keyBy( + new KeySelector<Tuple2<String, String>, String>() { + @Override + public String getKey(Tuple2<String, String> value) throws Exception { + return value.f0; + } + }) + .fullWindowPartition() + .mapPartition( + new RichMapPartitionFunction<Tuple2<String, String>, String>() { + + private boolean isOpen = false; + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + isOpen = true; + } + + @Override + public void mapPartition( + Iterable<Tuple2<String, String>> values, Collector<String> out) + throws Exception { + assertThat(isOpen).isTrue(); + } + }) + .sinkTo(new DiscardingSink<>()); + + env.execute(); + } + @Test void testReduce() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();