ruanhang1993 commented on code in PR #3438:
URL: https://github.com/apache/flink-cdc/pull/3438#discussion_r1687392535
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -93,6 +93,7 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env,
boolean isBlocking
@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism =
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
+ int sinkParallelism =
pipelineDef.getConfig().get(PipelineOptions.SINK_PARALLELISM);
Review Comment:
What if users do not set the SINK_PARALLELISM setting?
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -45,6 +45,15 @@ public class PipelineOptions {
.noDefaultValue()
.withDescription("Parallelism of the pipeline");
+
+
+ public static final ConfigOption<Integer> SINK_PARALLELISM =
+ ConfigOptions.key("sink.parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Parallelism of the sink in the
pipeline");
Review Comment:
Please add some tests for the setting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]