This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fe88e16faf588792cfc8c9f84f7992f5d88ace71 Author: Wencong Liu <[email protected]> AuthorDate: Tue Mar 12 12:32:15 2024 +0800 [FLINK-34543][datastream] Introduce the Reduce API on PartitionWindowedStream --- .../datastream/KeyedPartitionWindowedStream.java | 8 ++ .../NonKeyedPartitionWindowedStream.java | 16 +++ .../api/datastream/PartitionWindowedStream.java | 9 ++ .../api/operators/PartitionReduceOperator.java | 60 ++++++++++++ .../api/operators/PartitionReduceOperatorTest.java | 108 +++++++++++++++++++++ .../KeyedPartitionWindowedStreamITCase.java | 54 +++++++++++ .../NonKeyedPartitionWindowedStreamITCase.java | 29 ++++++ 7 files changed, 284 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java index 6f2364ec2a8..ccd9f02e197 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -72,4 +73,11 @@ public class KeyedPartitionWindowedStream<T, KEY> implements PartitionWindowedSt }, resultType); } + + @Override + public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFunction) { + checkNotNull(reduceFunction, "The reduce function must not be null."); + reduceFunction = environment.clean(reduceFunction); + return input.window(GlobalWindows.createWithEndOfStreamTrigger()).reduce(reduceFunction); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java index 33b36111c67..a1943a72f33 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java @@ -20,10 +20,14 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.MapPartitionOperator; +import org.apache.flink.streaming.api.operators.PartitionReduceOperator; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * {@link NonKeyedPartitionWindowedStream} represents a data stream that collects all records of @@ -56,4 +60,16 @@ public class NonKeyedPartitionWindowedStream<T> implements PartitionWindowedStre return input.transform(opName, resultType, new MapPartitionOperator<>(mapPartitionFunction)) .setParallelism(input.getParallelism()); } + + @Override + public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFunction) { + checkNotNull(reduceFunction, "The reduce function must not be null."); + reduceFunction = environment.clean(reduceFunction); + String opName = "PartitionReduce"; + return input.transform( + opName, + input.getTransformation().getOutputType(), + new PartitionReduceOperator<>(reduceFunction)) + .setParallelism(input.getParallelism()); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java index 2169dd6d71f..a9f0bcba28a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; /** * {@link PartitionWindowedStream} represents a data stream that collects all records of each @@ -40,4 +41,12 @@ public interface PartitionWindowedStream<T> { * @return The data stream with map partition result. */ <R> SingleOutputStreamOperator<R> mapPartition(MapPartitionFunction<T, R> mapPartitionFunction); + + /** + * Applies a reduce transformation on the records of the window. + * + * @param reduceFunction The reduce function. + * @return The data stream with final reduced result. + */ + SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFunction); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java new file mode 100644 index 00000000000..7d4107788a1 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * The {@link PartitionReduceOperator} is used to apply the reduce transformation on all records of + * each partition. Each partition contains all records of a subtask. + */ +@Internal +public class PartitionReduceOperator<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>> + implements OneInputStreamOperator<IN, IN>, BoundedOneInput { + + private final ReduceFunction<IN> reduceFunction; + + private IN currentRecord = null; + + public PartitionReduceOperator(ReduceFunction<IN> reduceFunction) { + super(reduceFunction); + this.reduceFunction = reduceFunction; + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + if (currentRecord == null) { + currentRecord = element.getValue(); + } else { + currentRecord = reduceFunction.reduce(currentRecord, element.getValue()); + } + } + + @Override + public void endInput() throws Exception { + output.collect(new StreamRecord<>(currentRecord)); + } + + @Override + public OperatorAttributes getOperatorAttributes() { + return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build(); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/PartitionReduceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/PartitionReduceOperatorTest.java new file mode 100644 index 00000000000..5eb86d1310b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/PartitionReduceOperatorTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link PartitionReduceOperator}. */ +class PartitionReduceOperatorTest { + + private static final int RECORD = 10; + + @Test + void testReduce() throws Exception { + PartitionReduceOperator<Integer> partitionReduceOperator = + new PartitionReduceOperator<>( + new Reduce(new CompletableFuture<>(), new CompletableFuture<>())); + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new OneInputStreamOperatorTestHarness<>(partitionReduceOperator); + Queue<Object> expectedOutput = new LinkedList<>(); + testHarness.open(); + testHarness.processElement(new StreamRecord<>(RECORD)); + testHarness.processElement(new StreamRecord<>(RECORD)); + testHarness.processElement(new StreamRecord<>(RECORD)); + testHarness.endInput(); + expectedOutput.add(new StreamRecord<>(RECORD * 3)); + TestHarnessUtil.assertOutputEquals( + "The reduce result is not correct.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + void testOpenClose() throws Exception { + CompletableFuture<Object> openIdentifier = new CompletableFuture<>(); + CompletableFuture<Object> closeIdentifier = new CompletableFuture<>(); + PartitionReduceOperator<Integer> partitionReduceOperator = + new PartitionReduceOperator<>(new Reduce(openIdentifier, closeIdentifier)); + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new OneInputStreamOperatorTestHarness<>(partitionReduceOperator); + testHarness.open(); + testHarness.processElement(new StreamRecord<>(RECORD)); + testHarness.endInput(); + testHarness.close(); + assertThat(openIdentifier).isCompleted(); + assertThat(closeIdentifier).isCompleted(); + assertThat(testHarness.getOutput()).isNotEmpty(); + } + + /** The test user implementation of {@link ReduceFunction}. */ + private static class Reduce extends RichReduceFunction<Integer> { + + private final CompletableFuture<Object> openIdentifier; + + private final CompletableFuture<Object> closeIdentifier; + + public Reduce( + CompletableFuture<Object> openIdentifier, + CompletableFuture<Object> closeIdentifier) { + this.openIdentifier = openIdentifier; + this.closeIdentifier = closeIdentifier; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + openIdentifier.complete(null); + } + + @Override + public Integer reduce(Integer value1, Integer value2) { + return value1 + value2; + } + + @Override + public void close() throws Exception { + super.close(); + closeIdentifier.complete(null); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java index 0c731210a82..4c2951bda6f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -92,6 +93,59 @@ class KeyedPartitionWindowedStreamITCase { createExpectedString(3)); } + @Test + void testReduce() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Tuple2<String, Integer>> source = + env.fromData( + Tuple2.of("key1", 1), + Tuple2.of("key1", 999), + Tuple2.of("key2", 2), + Tuple2.of("key2", 998), + Tuple2.of("key3", 3), + Tuple2.of("key3", 997)); + CloseableIterator<String> resultIterator = + source.map( + new MapFunction< + Tuple2<String, Integer>, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> map( + Tuple2<String, Integer> value) throws Exception { + return value; + } + }) + .setParallelism(2) + .keyBy( + new KeySelector<Tuple2<String, Integer>, String>() { + @Override + public String getKey(Tuple2<String, Integer> value) + throws Exception { + return value.f0; + } + }) + .fullWindowPartition() + .reduce( + new ReduceFunction<Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> reduce( + Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) + throws Exception { + return Tuple2.of(value1.f0, value1.f1 + value2.f1); + } + }) + .map( + new MapFunction<Tuple2<String, Integer>, String>() { + @Override + public String map(Tuple2<String, Integer> value) + throws Exception { + return value.f0 + value.f1; + } + }) + .executeAndCollect(); + expectInAnyOrder(resultIterator, "key11000", "key21000", "key31000"); + } + private Collection<Tuple2<String, String>> createSource() { List<Tuple2<String, String>> source = new ArrayList<>(); for (int index = 0; index < EVENT_NUMBER; ++index) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java index 19e6698b0c6..3b8559e4711 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java @@ -18,7 +18,9 @@ package org.apache.flink.test.streaming.runtime; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.NonKeyedPartitionWindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -69,6 +71,33 @@ class NonKeyedPartitionWindowedStreamITCase { expectInAnyOrder(resultIterator, expectedResult, expectedResult); } + @Test + void testReduce() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Integer> source = env.fromData(1, 1, 1, 1, 998, 998); + CloseableIterator<String> resultIterator = + source.map(v -> v) + .setParallelism(2) + .fullWindowPartition() + .reduce( + new ReduceFunction<Integer>() { + @Override + public Integer reduce(Integer value1, Integer value2) + throws Exception { + return value1 + value2; + } + }) + .map( + new MapFunction<Integer, String>() { + @Override + public String map(Integer value) throws Exception { + return String.valueOf(value); + } + }) + .executeAndCollect(); + expectInAnyOrder(resultIterator, "1000", "1000"); + } + private void expectInAnyOrder(CloseableIterator<String> resultIterator, String... expected) { List<String> listExpected = Lists.newArrayList(expected); List<String> testResults = Lists.newArrayList(resultIterator);
