This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb182525e8962faa483b678d061046062f64924d
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 18 15:48:35 2019 +0100

    [hotfix] CliFrontend.run() merges configurations into one
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 41 ++++++++++++++--------
 .../flink/client/cli/ExecutionConfigAccessor.java  |  5 +--
 2 files changed, 29 insertions(+), 17 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 38243fc..d82b377 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -208,33 +208,44 @@ public class CliFrontend {
                        throw new CliArgsException("Could not build the program 
from JAR file.", e);
                }
 
-               final CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(commandLine);
-               final Configuration executorConfig = 
customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-
                final List<URL> jobJars = program.getJobJarAndDependencies();
-               final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
-               final Configuration executionConfig = 
executionParameters.getConfiguration();
+               final Configuration effectiveConfiguration =
+                               getEffectiveConfiguration(commandLine, 
programOptions, jobJars);
 
                try {
-                       runProgram(executorConfig, executionConfig, program);
+                       runProgram(effectiveConfiguration, program);
                } finally {
                        program.deleteExtractedLibraries();
                }
        }
 
+       private Configuration getEffectiveConfiguration(
+                       final CommandLine commandLine,
+                       final ProgramOptions programOptions,
+                       final List<URL> jobJars) throws FlinkException {
+
+               final CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(checkNotNull(commandLine));
+               final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromProgramOptions(
+                               checkNotNull(programOptions),
+                               checkNotNull(jobJars));
+
+               final Configuration executorConfig = 
customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+               final Configuration effectiveConfiguration = new 
Configuration(executorConfig);
+               return 
executionParameters.applyToConfiguration(effectiveConfiguration);
+       }
+
        private <ClusterID> void runProgram(
-                       Configuration executorConfig,
-                       Configuration executionConfig,
+                       Configuration configuration,
                        PackagedProgram program) throws 
ProgramInvocationException, FlinkException {
 
-               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(configuration);
                checkNotNull(clusterClientFactory);
 
-               final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executorConfig);
+               final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration);
 
                try {
-                       final ClusterID clusterId = 
clusterClientFactory.getClusterId(executorConfig);
-                       final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromConfiguration(executionConfig);
+                       final ClusterID clusterId = 
clusterClientFactory.getClusterId(configuration);
+                       final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromConfiguration(configuration);
                        final ClusterClient<ClusterID> client;
 
                        // directly deploy the job if the cluster is started in 
job mode and detached
@@ -243,7 +254,7 @@ public class CliFrontend {
 
                                final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
-                               final ClusterSpecification clusterSpecification 
= clusterClientFactory.getClusterSpecification(executorConfig);
+                               final ClusterSpecification clusterSpecification 
= clusterClientFactory.getClusterSpecification(configuration);
                                client = clusterDescriptor.deployJobCluster(
                                        clusterSpecification,
                                        jobGraph,
@@ -264,7 +275,7 @@ public class CliFrontend {
                                } else {
                                        // also in job mode we have to deploy a 
session cluster because the job
                                        // might consist of multiple parts 
(e.g. when using collect)
-                                       final ClusterSpecification 
clusterSpecification = 
clusterClientFactory.getClusterSpecification(executorConfig);
+                                       final ClusterSpecification 
clusterSpecification = 
clusterClientFactory.getClusterSpecification(configuration);
                                        client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
                                        // if not running in detached mode, add 
a shutdown hook to shut down cluster if client exits
                                        // there's a race-condition here if cli 
is killed before shutdown hook is installed
@@ -279,7 +290,7 @@ public class CliFrontend {
                                        int userParallelism = 
executionParameters.getParallelism();
                                        LOG.debug("User parallelism is set to 
{}", userParallelism);
 
-                                       executeProgram(executionConfig, 
program, client);
+                                       executeProgram(configuration, program, 
client);
                                } finally {
                                        if (clusterId == null && 
!executionParameters.getDetachedMode()) {
                                                // terminate the cluster only 
if we have started it before and if it's not detached
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index 9e570e1..f55560b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -77,8 +77,9 @@ public class ExecutionConfigAccessor {
                return new ExecutionConfigAccessor(configuration);
        }
 
-       public Configuration getConfiguration() {
-               return configuration;
+       Configuration applyToConfiguration(final Configuration 
baseConfiguration) {
+               baseConfiguration.addAll(configuration);
+               return baseConfiguration;
        }
 
        public List<URL> getJars() {

Reply via email to