JunRuiLee commented on code in PR #23944:
URL: https://github.com/apache/flink/pull/23944#discussion_r1429569157
##########
flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java:
##########
@@ -40,8 +43,16 @@ public static JobGraph getJobGraph(
FlinkPipelineTranslator pipelineTranslator =
getPipelineTranslator(userClassloader, pipeline);
- return pipelineTranslator.translateToJobGraph(
- pipeline, optimizerConfiguration, defaultParallelism);
+ JobGraph jobGraph =
+ pipelineTranslator.translateToJobGraph(
+ pipeline, optimizerConfiguration, defaultParallelism);
+
+ Map<String, String> parallelismOverrides =
+
optimizerConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES);
+ jobGraph.getJobConfiguration()
+ .set(PipelineOptions.PARALLELISM_OVERRIDES,
parallelismOverrides);
+
Review Comment:
I suggest use `getOptional` instead of `get` for fetching
`PARALLELISM_OVERRIDES` to ensure that the 'jobConfiguration' only contains
which is explicitly set by users like below.
`optimizerConfiguration.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).ifPresent(map
-> jobGraph.getJobConfiguration().set(PipelineOptions.PARALLELISM_OVERRIDES,
map));`
Although the current jobConfiguration does not yet include all job-level
configurations, in the long term, this jobConfiguration field should contain
and only contain the job-level configuration items explicitly configured by the
user.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]