This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7a1c2b5750d9576f09023cf2810ed965072b699e Author: Eric Xiao <[email protected]> AuthorDate: Thu Apr 20 16:03:31 2023 -0400 [FLINK-31873][API / DataStream] Add method setMaxParallelism to DataStreamSink. --- .../streaming/api/datastream/DataStreamSink.java | 16 ++++++++++++++ .../api/graph/StreamingJobGraphGeneratorTest.java | 25 ++++++++++++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 0900ebd8225..7b8d241e6ec 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.SlotSharingGroup; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -180,6 +181,21 @@ public class DataStreamSink<T> { return this; } + /** + * Sets the max parallelism for this sink. + * + * <p>The maximum parallelism specifies the upper bound for dynamic scaling. The degree must be + * higher than zero and less than the upper bound. + * + * @param maxParallelism The max parallelism for this sink. + * @return The sink with set parallelism. + */ + public DataStreamSink<T> setMaxParallelism(int maxParallelism) { + OperatorValidationUtils.validateMaxParallelism(maxParallelism, true); + transformation.setMaxParallelism(maxParallelism); + return this; + } + /** * Sets the description for this sink. * diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index cf09d70e236..a58d83c152e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -259,8 +259,8 @@ class StreamingJobGraphGeneratorTest { @Test void testTransformationSetParallelism() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - /* The default parallelism of the environment (that is inherited by the source) - and the parallelism of the map operator needs to be different for this test */ + // The default parallelism of the environment (that is inherited by the source) + // and the parallelism of the map operator needs to be different for this test env.setParallelism(4); env.fromSequence(1L, 3L).map(i -> i).setParallelism(10).print().setParallelism(20); StreamGraph streamGraph = env.getStreamGraph(); @@ -283,6 +283,27 @@ class StreamingJobGraphGeneratorTest { assertThat(vertices.get(2).isParallelismConfigured()).isTrue(); } + @Test + void testTransformationSetMaxParallelism() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // The max parallelism of the environment (that is inherited by the source) + // and the parallelism of the map operator needs to be different for this test + env.setMaxParallelism(4); + + DataStreamSource<Long> source = + env.fromSequence(1L, 3L); // no explicit max parallelism set, grab from environment. + SingleOutputStreamOperator<Long> map = source.map(i -> i).setMaxParallelism(10); + DataStreamSink<Long> sink = map.print().setMaxParallelism(20); + + StreamGraph streamGraph = env.getStreamGraph(); + + // check the streamGraph max parallelism is configured correctly + assertThat(streamGraph.getStreamNode(source.getId()).getMaxParallelism()).isEqualTo(4); + assertThat(streamGraph.getStreamNode(map.getId()).getMaxParallelism()).isEqualTo(10); + assertThat(streamGraph.getStreamNode(sink.getTransformation().getId()).getMaxParallelism()) + .isEqualTo(20); + } + @Test void testChainNodeSetParallelism() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
