This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe88e16faf588792cfc8c9f84f7992f5d88ace71
Author: Wencong Liu <[email protected]>
AuthorDate: Tue Mar 12 12:32:15 2024 +0800

    [FLINK-34543][datastream] Introduce the Reduce API on 
PartitionWindowedStream
---
 .../datastream/KeyedPartitionWindowedStream.java   |   8 ++
 .../NonKeyedPartitionWindowedStream.java           |  16 +++
 .../api/datastream/PartitionWindowedStream.java    |   9 ++
 .../api/operators/PartitionReduceOperator.java     |  60 ++++++++++++
 .../api/operators/PartitionReduceOperatorTest.java | 108 +++++++++++++++++++++
 .../KeyedPartitionWindowedStreamITCase.java        |  54 +++++++++++
 .../NonKeyedPartitionWindowedStreamITCase.java     |  29 ++++++
 7 files changed, 284 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
index 6f2364ec2a8..ccd9f02e197 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -72,4 +73,11 @@ public class KeyedPartitionWindowedStream<T, KEY> implements 
PartitionWindowedSt
                         },
                         resultType);
     }
+
+    @Override
+    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> 
reduceFunction) {
+        checkNotNull(reduceFunction, "The reduce function must not be null.");
+        reduceFunction = environment.clean(reduceFunction);
+        return 
input.window(GlobalWindows.createWithEndOfStreamTrigger()).reduce(reduceFunction);
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
index 33b36111c67..a1943a72f33 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
@@ -20,10 +20,14 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.MapPartitionOperator;
+import org.apache.flink.streaming.api.operators.PartitionReduceOperator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * {@link NonKeyedPartitionWindowedStream} represents a data stream that 
collects all records of
@@ -56,4 +60,16 @@ public class NonKeyedPartitionWindowedStream<T> implements 
PartitionWindowedStre
         return input.transform(opName, resultType, new 
MapPartitionOperator<>(mapPartitionFunction))
                 .setParallelism(input.getParallelism());
     }
+
+    @Override
+    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> 
reduceFunction) {
+        checkNotNull(reduceFunction, "The reduce function must not be null.");
+        reduceFunction = environment.clean(reduceFunction);
+        String opName = "PartitionReduce";
+        return input.transform(
+                        opName,
+                        input.getTransformation().getOutputType(),
+                        new PartitionReduceOperator<>(reduceFunction))
+                .setParallelism(input.getParallelism());
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
index 2169dd6d71f..a9f0bcba28a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 
 /**
  * {@link PartitionWindowedStream} represents a data stream that collects all 
records of each
@@ -40,4 +41,12 @@ public interface PartitionWindowedStream<T> {
      * @return The data stream with map partition result.
      */
     <R> SingleOutputStreamOperator<R> mapPartition(MapPartitionFunction<T, R> 
mapPartitionFunction);
+
+    /**
+     * Applies a reduce transformation on the records of the window.
+     *
+     * @param reduceFunction The reduce function.
+     * @return The data stream with final reduced result.
+     */
+    SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFunction);
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java
new file mode 100644
index 00000000000..7d4107788a1
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * The {@link PartitionReduceOperator} is used to apply the reduce 
transformation on all records of
+ * each partition. Each partition contains all records of a subtask.
+ */
+@Internal
+public class PartitionReduceOperator<IN> extends AbstractUdfStreamOperator<IN, 
ReduceFunction<IN>>
+        implements OneInputStreamOperator<IN, IN>, BoundedOneInput {
+
+    private final ReduceFunction<IN> reduceFunction;
+
+    private IN currentRecord = null;
+
+    public PartitionReduceOperator(ReduceFunction<IN> reduceFunction) {
+        super(reduceFunction);
+        this.reduceFunction = reduceFunction;
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> element) throws Exception {
+        if (currentRecord == null) {
+            currentRecord = element.getValue();
+        } else {
+            currentRecord = reduceFunction.reduce(currentRecord, 
element.getValue());
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        output.collect(new StreamRecord<>(currentRecord));
+    }
+
+    @Override
+    public OperatorAttributes getOperatorAttributes() {
+        return new 
OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build();
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/PartitionReduceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/PartitionReduceOperatorTest.java
new file mode 100644
index 00000000000..5eb86d1310b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/PartitionReduceOperatorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link PartitionReduceOperator}. */
+class PartitionReduceOperatorTest {
+
+    private static final int RECORD = 10;
+
+    @Test
+    void testReduce() throws Exception {
+        PartitionReduceOperator<Integer> partitionReduceOperator =
+                new PartitionReduceOperator<>(
+                        new Reduce(new CompletableFuture<>(), new 
CompletableFuture<>()));
+        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+                new 
OneInputStreamOperatorTestHarness<>(partitionReduceOperator);
+        Queue<Object> expectedOutput = new LinkedList<>();
+        testHarness.open();
+        testHarness.processElement(new StreamRecord<>(RECORD));
+        testHarness.processElement(new StreamRecord<>(RECORD));
+        testHarness.processElement(new StreamRecord<>(RECORD));
+        testHarness.endInput();
+        expectedOutput.add(new StreamRecord<>(RECORD * 3));
+        TestHarnessUtil.assertOutputEquals(
+                "The reduce result is not correct.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    void testOpenClose() throws Exception {
+        CompletableFuture<Object> openIdentifier = new CompletableFuture<>();
+        CompletableFuture<Object> closeIdentifier = new CompletableFuture<>();
+        PartitionReduceOperator<Integer> partitionReduceOperator =
+                new PartitionReduceOperator<>(new Reduce(openIdentifier, 
closeIdentifier));
+        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+                new 
OneInputStreamOperatorTestHarness<>(partitionReduceOperator);
+        testHarness.open();
+        testHarness.processElement(new StreamRecord<>(RECORD));
+        testHarness.endInput();
+        testHarness.close();
+        assertThat(openIdentifier).isCompleted();
+        assertThat(closeIdentifier).isCompleted();
+        assertThat(testHarness.getOutput()).isNotEmpty();
+    }
+
+    /** The test user implementation of {@link ReduceFunction}. */
+    private static class Reduce extends RichReduceFunction<Integer> {
+
+        private final CompletableFuture<Object> openIdentifier;
+
+        private final CompletableFuture<Object> closeIdentifier;
+
+        public Reduce(
+                CompletableFuture<Object> openIdentifier,
+                CompletableFuture<Object> closeIdentifier) {
+            this.openIdentifier = openIdentifier;
+            this.closeIdentifier = closeIdentifier;
+        }
+
+        @Override
+        public void open(OpenContext openContext) throws Exception {
+            super.open(openContext);
+            openIdentifier.complete(null);
+        }
+
+        @Override
+        public Integer reduce(Integer value1, Integer value2) {
+            return value1 + value2;
+        }
+
+        @Override
+        public void close() throws Exception {
+            super.close();
+            closeIdentifier.complete(null);
+        }
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
index 0c731210a82..4c2951bda6f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -92,6 +93,59 @@ class KeyedPartitionWindowedStreamITCase {
                 createExpectedString(3));
     }
 
+    @Test
+    void testReduce() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Tuple2<String, Integer>> source =
+                env.fromData(
+                        Tuple2.of("key1", 1),
+                        Tuple2.of("key1", 999),
+                        Tuple2.of("key2", 2),
+                        Tuple2.of("key2", 998),
+                        Tuple2.of("key3", 3),
+                        Tuple2.of("key3", 997));
+        CloseableIterator<String> resultIterator =
+                source.map(
+                                new MapFunction<
+                                        Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
+                                    @Override
+                                    public Tuple2<String, Integer> map(
+                                            Tuple2<String, Integer> value) 
throws Exception {
+                                        return value;
+                                    }
+                                })
+                        .setParallelism(2)
+                        .keyBy(
+                                new KeySelector<Tuple2<String, Integer>, 
String>() {
+                                    @Override
+                                    public String getKey(Tuple2<String, 
Integer> value)
+                                            throws Exception {
+                                        return value.f0;
+                                    }
+                                })
+                        .fullWindowPartition()
+                        .reduce(
+                                new ReduceFunction<Tuple2<String, Integer>>() {
+                                    @Override
+                                    public Tuple2<String, Integer> reduce(
+                                            Tuple2<String, Integer> value1,
+                                            Tuple2<String, Integer> value2)
+                                            throws Exception {
+                                        return Tuple2.of(value1.f0, value1.f1 
+ value2.f1);
+                                    }
+                                })
+                        .map(
+                                new MapFunction<Tuple2<String, Integer>, 
String>() {
+                                    @Override
+                                    public String map(Tuple2<String, Integer> 
value)
+                                            throws Exception {
+                                        return value.f0 + value.f1;
+                                    }
+                                })
+                        .executeAndCollect();
+        expectInAnyOrder(resultIterator, "key11000", "key21000", "key31000");
+    }
+
     private Collection<Tuple2<String, String>> createSource() {
         List<Tuple2<String, String>> source = new ArrayList<>();
         for (int index = 0; index < EVENT_NUMBER; ++index) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
index 19e6698b0c6..3b8559e4711 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import 
org.apache.flink.streaming.api.datastream.NonKeyedPartitionWindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -69,6 +71,33 @@ class NonKeyedPartitionWindowedStreamITCase {
         expectInAnyOrder(resultIterator, expectedResult, expectedResult);
     }
 
+    @Test
+    void testReduce() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Integer> source = env.fromData(1, 1, 1, 1, 998, 998);
+        CloseableIterator<String> resultIterator =
+                source.map(v -> v)
+                        .setParallelism(2)
+                        .fullWindowPartition()
+                        .reduce(
+                                new ReduceFunction<Integer>() {
+                                    @Override
+                                    public Integer reduce(Integer value1, 
Integer value2)
+                                            throws Exception {
+                                        return value1 + value2;
+                                    }
+                                })
+                        .map(
+                                new MapFunction<Integer, String>() {
+                                    @Override
+                                    public String map(Integer value) throws 
Exception {
+                                        return String.valueOf(value);
+                                    }
+                                })
+                        .executeAndCollect();
+        expectInAnyOrder(resultIterator, "1000", "1000");
+    }
+
     private void expectInAnyOrder(CloseableIterator<String> resultIterator, 
String... expected) {
         List<String> listExpected = Lists.newArrayList(expected);
         List<String> testResults = Lists.newArrayList(resultIterator);

Reply via email to