This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a1c2b5750d9576f09023cf2810ed965072b699e
Author: Eric Xiao <[email protected]>
AuthorDate: Thu Apr 20 16:03:31 2023 -0400

    [FLINK-31873][API / DataStream] Add method setMaxParallelism to 
DataStreamSink.
---
 .../streaming/api/datastream/DataStreamSink.java   | 16 ++++++++++++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 25 ++++++++++++++++++++--
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 0900ebd8225..7b8d241e6ec 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -180,6 +181,21 @@ public class DataStreamSink<T> {
         return this;
     }
 
+    /**
+     * Sets the max parallelism for this sink.
+     *
+     * <p>The maximum parallelism specifies the upper bound for dynamic 
scaling. The degree must be
+     * higher than zero and less than the upper bound.
+     *
+     * @param maxParallelism The max parallelism for this sink.
+     * @return The sink with set parallelism.
+     */
+    public DataStreamSink<T> setMaxParallelism(int maxParallelism) {
+        OperatorValidationUtils.validateMaxParallelism(maxParallelism, true);
+        transformation.setMaxParallelism(maxParallelism);
+        return this;
+    }
+
     /**
      * Sets the description for this sink.
      *
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index cf09d70e236..a58d83c152e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -259,8 +259,8 @@ class StreamingJobGraphGeneratorTest {
     @Test
     void testTransformationSetParallelism() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        /* The default parallelism of the environment (that is inherited by 
the source)
-        and the parallelism of the map operator needs to be different for this 
test */
+        // The default parallelism of the environment (that is inherited by 
the source)
+        // and the parallelism of the map operator needs to be different for 
this test
         env.setParallelism(4);
         env.fromSequence(1L, 3L).map(i -> 
i).setParallelism(10).print().setParallelism(20);
         StreamGraph streamGraph = env.getStreamGraph();
@@ -283,6 +283,27 @@ class StreamingJobGraphGeneratorTest {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // The max parallelism of the environment (that is inherited by the 
source)
+        // and the parallelism of the map operator needs to be different for 
this test
+        env.setMaxParallelism(4);
+
+        DataStreamSource<Long> source =
+                env.fromSequence(1L, 3L); // no explicit max parallelism set, 
grab from environment.
+        SingleOutputStreamOperator<Long> map = source.map(i -> 
i).setMaxParallelism(10);
+        DataStreamSink<Long> sink = map.print().setMaxParallelism(20);
+
+        StreamGraph streamGraph = env.getStreamGraph();
+
+        // check the streamGraph max parallelism is configured correctly
+        
assertThat(streamGraph.getStreamNode(source.getId()).getMaxParallelism()).isEqualTo(4);
+        
assertThat(streamGraph.getStreamNode(map.getId()).getMaxParallelism()).isEqualTo(10);
+        
assertThat(streamGraph.getStreamNode(sink.getTransformation().getId()).getMaxParallelism())
+                .isEqualTo(20);
+    }
+
     @Test
     void testChainNodeSetParallelism() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Reply via email to