[ 
https://issues.apache.org/jira/browse/FLINK-14861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-14861:
-------------------------------
    Description: 
I set parameter "parallelism.default" in flink-conf.yaml, but it's do not work 
any more when I rebased my branch to master. I debug and find it's a bug 
imported  by FLINK-14745(https://issues.apache.org/jira/browse/FLINK-14745).

Detail: 
{code:java}
// ExecutionConfigAccessor#fromProgramOptions
public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions 
options, final List<URL> jobJars) {
   checkNotNull(options);
   checkNotNull(jobJars);

   final Configuration configuration = new Configuration();

   if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
      configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
options.getParallelism());
   }

   configuration.setBoolean(DeploymentOptions.ATTACHED, 
!options.getDetachedMode());
   configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, 
options.isShutdownOnAttachedExit());

   ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
   ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, 
jobJars, URL::toString);

   
SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), 
configuration);

   return new ExecutionConfigAccessor(configuration);
}{code}
 
 [1]. function executionConfigAccessor.getParallelism() will return 1 rather 
than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT 
because 
 when getParallelism() function will return the defaultValue of 
CoreOptions.DEFAULT_PARALLELISM.

 
{code:java}
// ExecutionConfigAccessor.java
public int getParallelism() {
 return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
} 
// Configuration.java
public int getInteger(ConfigOption<Integer> configOption) {
 return getOptional(configOption)
 .orElseGet(configOption::defaultValue);
}{code}
 

And function executionConfigAccessor.getParallelism()  still return 1 when 
options.getParallelism() == 1.

So, the following code in  CliFrontend.java will never reach if user not set 
parallelism in flink run command line.
{code:java}
// CliFrontend.java
int parallelism = executionParameters.getParallelism() == -1 ? 
defaultParallelism : executionParameters.getParallelism();{code}
[2]and another  position, I think we should keep three lines which deleted in 
FLINK-14745--. 
{code:java}
// 
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
//if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
//userParallelism = defaultParallelism;
// }
executeProgram(program, client, userParallelism, 
executionParameters.getDetachedMode());
 
{code}
 

  was:
I set parameter "parallelism.default" in flink-conf.yaml, but it's do not work 
any more when I rebased my branch to master. I debug and find it's a bug 
imported  by [FLINK-14745](https://issues.apache.org/jira/browse/FLINK-14745).

Detail: 
{code:java}
// ExecutionConfigAccessor#fromProgramOptions
public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions 
options, final List<URL> jobJars) {
   checkNotNull(options);
   checkNotNull(jobJars);

   final Configuration configuration = new Configuration();

   if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
      configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
options.getParallelism());
   }

   configuration.setBoolean(DeploymentOptions.ATTACHED, 
!options.getDetachedMode());
   configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, 
options.isShutdownOnAttachedExit());

   ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
   ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, 
jobJars, URL::toString);

   
SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), 
configuration);

   return new ExecutionConfigAccessor(configuration);
}{code}
 
[1]. function executionConfigAccessor.getParallelism() will return 1 rather 
than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT 
because 
when getParallelism() function will return the defaultValue of 
CoreOptions.DEFAULT_PARALLELISM.

 
{code:java}
// ExecutionConfigAccessor.java
public int getParallelism() {
 return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
} 
// Configuration.java
public int getInteger(ConfigOption<Integer> configOption) {
 return getOptional(configOption)
 .orElseGet(configOption::defaultValue);
}{code}
 

And function executionConfigAccessor.getParallelism()  still return 1 when 
options.getParallelism() == 1.

So, the following code in  CliFrontend.java will never reach if user not set 
parallelism in flink run command line.
{code:java}
// CliFrontend.java
int parallelism = executionParameters.getParallelism() == -1 ? 
defaultParallelism : executionParameters.getParallelism();{code}
[2]and another  position, I think we should keep two line which deleted in 
FLINK-14745--. 
{code:java}
// 
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);

{code}
*if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) \{
   userParallelism = defaultParallelism;
}*
{code:java}
executeProgram(program, client, userParallelism, 
executionParameters.getDetachedMode());
{code}
 


> parallelism.default in flink-conf.yaml do not work which is a bug imported 
> by[FLINK-14745] 
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14861
>                 URL: https://issues.apache.org/jira/browse/FLINK-14861
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.10.0
>            Reporter: Leonard Xu
>            Priority: Critical
>             Fix For: 1.10.0
>
>
> I set parameter "parallelism.default" in flink-conf.yaml, but it's do not 
> work any more when I rebased my branch to master. I debug and find it's a bug 
> imported  by FLINK-14745(https://issues.apache.org/jira/browse/FLINK-14745).
> Detail: 
> {code:java}
> // ExecutionConfigAccessor#fromProgramOptions
> public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions 
> options, final List<URL> jobJars) {
>    checkNotNull(options);
>    checkNotNull(jobJars);
>    final Configuration configuration = new Configuration();
>    if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
>       configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
> options.getParallelism());
>    }
>    configuration.setBoolean(DeploymentOptions.ATTACHED, 
> !options.getDetachedMode());
>    configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, 
> options.isShutdownOnAttachedExit());
>    ConfigUtils.encodeCollectionToConfig(configuration, 
> PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
>    ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, 
> jobJars, URL::toString);
>    
> SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(),
>  configuration);
>    return new ExecutionConfigAccessor(configuration);
> }{code}
>  
>  [1]. function executionConfigAccessor.getParallelism() will return 1 rather 
> than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT 
> because 
>  when getParallelism() function will return the defaultValue of 
> CoreOptions.DEFAULT_PARALLELISM.
>  
> {code:java}
> // ExecutionConfigAccessor.java
> public int getParallelism() {
>  return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
> } 
> // Configuration.java
> public int getInteger(ConfigOption<Integer> configOption) {
>  return getOptional(configOption)
>  .orElseGet(configOption::defaultValue);
> }{code}
>  
> And function executionConfigAccessor.getParallelism()  still return 1 when 
> options.getParallelism() == 1.
> So, the following code in  CliFrontend.java will never reach if user not set 
> parallelism in flink run command line.
> {code:java}
> // CliFrontend.java
> int parallelism = executionParameters.getParallelism() == -1 ? 
> defaultParallelism : executionParameters.getParallelism();{code}
> [2]and another  position, I think we should keep three lines which deleted in 
> FLINK-14745--. 
> {code:java}
> // 
> int userParallelism = executionParameters.getParallelism();
> LOG.debug("User parallelism is set to {}", userParallelism);
> //if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
> //userParallelism = defaultParallelism;
> // }
> executeProgram(program, client, userParallelism, 
> executionParameters.getDetachedMode());
>  
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to