[
https://issues.apache.org/jira/browse/FLINK-14861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kostas Kloudas reassigned FLINK-14861:
--------------------------------------
Assignee: Kostas Kloudas
> parallelism.default in flink-conf.yaml do not work which is a bug imported
> by[FLINK-14745]
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-14861
> URL: https://issues.apache.org/jira/browse/FLINK-14861
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission
> Affects Versions: 1.10.0
> Reporter: Leonard Xu
> Assignee: Kostas Kloudas
> Priority: Critical
> Fix For: 1.10.0
>
>
> I set parameter "parallelism.default" in flink-conf.yaml, but it's do not
> work any more when I rebased my branch to master. I debug and find it's a bug
> imported by FLINK-14745(https://issues.apache.org/jira/browse/FLINK-14745).
> Detail:
> {code:java}
> // ExecutionConfigAccessor#fromProgramOptions
> public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions
> options, final List<URL> jobJars) {
> checkNotNull(options);
> checkNotNull(jobJars);
> final Configuration configuration = new Configuration();
> if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
> configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM,
> options.getParallelism());
> }
> configuration.setBoolean(DeploymentOptions.ATTACHED,
> !options.getDetachedMode());
> configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED,
> options.isShutdownOnAttachedExit());
> ConfigUtils.encodeCollectionToConfig(configuration,
> PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
> ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS,
> jobJars, URL::toString);
>
> SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(),
> configuration);
> return new ExecutionConfigAccessor(configuration);
> }{code}
>
> [1]. function executionConfigAccessor.getParallelism() will return 1 rather
> than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
> because
> when getParallelism() function will return the defaultValue of
> CoreOptions.DEFAULT_PARALLELISM.
>
> {code:java}
> // ExecutionConfigAccessor.java
> public int getParallelism() {
> return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
> }
> // Configuration.java
> public int getInteger(ConfigOption<Integer> configOption) {
> return getOptional(configOption)
> .orElseGet(configOption::defaultValue);
> }{code}
>
> And function executionConfigAccessor.getParallelism() still return 1 when
> options.getParallelism() == 1.
> So, the following code in CliFrontend.java will never reach if user not set
> parallelism in flink run command line.
> {code:java}
> // CliFrontend.java
> int parallelism = executionParameters.getParallelism() == -1 ?
> defaultParallelism : executionParameters.getParallelism();{code}
> [2]and another position, I think we should keep three lines which deleted in
> FLINK-14745--.
> {code:java}
> //
> int userParallelism = executionParameters.getParallelism();
> LOG.debug("User parallelism is set to {}", userParallelism);
> //if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
> //userParallelism = defaultParallelism;
> // }
> executeProgram(program, client, userParallelism,
> executionParameters.getDetachedMode());
>
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)