This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd8570f8551c78608b847109c0a61f8a46c6cb65
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Nov 13 15:09:03 2019 +0100

    [FLINK-14745] Add dependencies of job as list of URLs in config
---
 .../src/main/java/org/apache/flink/client/ClientUtils.java     |  2 +-
 .../src/main/java/org/apache/flink/client/cli/CliFrontend.java |  3 ++-
 .../org/apache/flink/client/cli/ExecutionConfigAccessor.java   |  9 ++++++++-
 .../java/org/apache/flink/client/program/PackagedProgram.java  |  4 ++--
 .../org/apache/flink/client/program/PackagedProgramUtils.java  |  2 +-
 .../java/org/apache/flink/client/cli/CliFrontendRunTest.java   |  4 ++--
 .../flink/table/client/gateway/local/ExecutionContext.java     | 10 ++++------
 7 files changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index f756449..043b740 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -147,7 +147,7 @@ public enum ClientUtils {
 
                        ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(
                                client,
-                               program.getAllLibraries(),
+                               program.getJobJarAndDependencies(),
                                program.getClasspaths(),
                                program.getUserCodeClassLoader(),
                                parallelism,
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 6e9b2f9..9ff6d53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -211,7 +211,8 @@ public class CliFrontend {
                final CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(commandLine);
                final Configuration executorConfig = 
customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
 
-               final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromProgramOptions(programOptions);
+               final List<URL> jobJars = program.getJobJarAndDependencies();
+               final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
                final Configuration executionConfig = 
executionParameters.getConfiguration();
 
                try {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index ee32449..ec627ac 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -55,15 +55,18 @@ public class ExecutionConfigAccessor {
        /**
         * Creates an {@link ExecutionConfigAccessor} based on the provided 
{@link ProgramOptions} as provided by the user through the CLI.
         */
-       public static ExecutionConfigAccessor fromProgramOptions(final 
ProgramOptions options) {
+       public static ExecutionConfigAccessor fromProgramOptions(final 
ProgramOptions options, final List<URL> jobJars) {
                checkNotNull(options);
+               checkNotNull(jobJars);
 
                final Configuration configuration = new Configuration();
+
                configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
options.getParallelism());
                configuration.setBoolean(DeploymentOptions.ATTACHED, 
!options.getDetachedMode());
                
configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, 
options.isShutdownOnAttachedExit());
 
                ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
+               ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.JARS, jobJars, URL::toString);
 
                
SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), 
configuration);
 
@@ -74,6 +77,10 @@ public class ExecutionConfigAccessor {
                return configuration;
        }
 
+       public List<URL> getJars() {
+               return decodeUrlList(configuration, PipelineOptions.JARS);
+       }
+
        public List<URL> getClasspaths() {
                return decodeUrlList(configuration, PipelineOptions.CLASSPATHS);
        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 2b593ca..1d96646 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -140,7 +140,7 @@ public class PackagedProgram {
                // now that we have an entry point, we can extract the nested 
jar files (if any)
                this.extractedTempLibraries = jarFileUrl == null ? 
Collections.emptyList() : extractContainedLibraries(jarFileUrl);
                this.classpaths = classpaths;
-               this.userCodeClassLoader = 
ClientUtils.buildUserCodeClassLoader(getAllLibraries(), classpaths, 
getClass().getClassLoader());
+               this.userCodeClassLoader = 
ClientUtils.buildUserCodeClassLoader(getJobJarAndDependencies(), classpaths, 
getClass().getClassLoader());
 
                // load the entry point class
                this.mainClass = loadMainClass(entryPointClassName, 
userCodeClassLoader);
@@ -227,7 +227,7 @@ public class PackagedProgram {
        /**
         * Returns all provided libraries needed to run the program.
         */
-       public List<URL> getAllLibraries() {
+       public List<URL> getJobJarAndDependencies() {
                List<URL> libs = new 
ArrayList<URL>(this.extractedTempLibraries.size() + 1);
 
                if (jarFile != null) {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 00ac231..edf3617 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -63,7 +63,7 @@ public class PackagedProgramUtils {
                if (jobID != null) {
                        jobGraph.setJobID(jobID);
                }
-               jobGraph.addJars(packagedProgram.getAllLibraries());
+               jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
                jobGraph.setClasspaths(packagedProgram.getClasspaths());
                
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
 
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index d2aff28..449e1b2 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -85,7 +85,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 
                        CommandLine commandLine = 
CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
                        ProgramOptions programOptions = new 
ProgramOptions(commandLine);
-                       ExecutionConfigAccessor executionOptions = 
ExecutionConfigAccessor.fromProgramOptions(programOptions);
+                       ExecutionConfigAccessor executionOptions = 
ExecutionConfigAccessor.fromProgramOptions(programOptions, 
Collections.emptyList());
 
                        SavepointRestoreSettings savepointSettings = 
executionOptions.getSavepointRestoreSettings();
                        assertTrue(savepointSettings.restoreSavepoint());
@@ -99,7 +99,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 
                        CommandLine commandLine = 
CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
                        ProgramOptions programOptions = new 
ProgramOptions(commandLine);
-                       ExecutionConfigAccessor executionOptions = 
ExecutionConfigAccessor.fromProgramOptions(programOptions);
+                       ExecutionConfigAccessor executionOptions = 
ExecutionConfigAccessor.fromProgramOptions(programOptions, 
Collections.emptyList());
 
                        SavepointRestoreSettings savepointSettings = 
executionOptions.getSavepointRestoreSettings();
                        assertTrue(savepointSettings.restoreSavepoint());
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index e37635f..99c43a8 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -113,7 +113,6 @@ public class ExecutionContext<ClusterID> {
 
        private final SessionContext sessionContext;
        private final Environment mergedEnv;
-       private final List<URL> dependencies;
        private final ClassLoader classLoader;
        private final Map<String, Module> modules;
        private final Map<String, Catalog> catalogs;
@@ -136,7 +135,6 @@ public class ExecutionContext<ClusterID> {
                        Configuration flinkConfig, ClusterClientServiceLoader 
clusterClientServiceLoader, Options commandLineOptions, List<CustomCommandLine> 
availableCommandLines) throws FlinkException {
                this.sessionContext = sessionContext.copy(); // create internal 
copy because session context is mutable
                this.mergedEnv = Environment.merge(defaultEnvironment, 
sessionContext.getEnvironment());
-               this.dependencies = dependencies;
                this.flinkConfig = flinkConfig;
 
                // create class loader
@@ -184,7 +182,7 @@ public class ExecutionContext<ClusterID> {
                clusterClientFactory = 
serviceLoader.getClusterClientFactory(executorConfig);
                checkState(clusterClientFactory != null);
 
-               executionParameters = 
createExecutionParameterProvider(commandLine);
+               executionParameters = 
createExecutionParameterProvider(commandLine, dependencies);
                clusterId = clusterClientFactory.getClusterId(executorConfig);
                clusterSpec = 
clusterClientFactory.getClusterSpecification(executorConfig);
        }
@@ -262,10 +260,10 @@ public class ExecutionContext<ClusterID> {
                throw new SqlExecutionException("Could not find a matching 
deployment.");
        }
 
-       private static ExecutionConfigAccessor 
createExecutionParameterProvider(CommandLine commandLine) {
+       private static ExecutionConfigAccessor 
createExecutionParameterProvider(CommandLine commandLine, List<URL> jobJars) {
                try {
                        final ProgramOptions programOptions = new 
ProgramOptions(commandLine);
-                       return 
ExecutionConfigAccessor.fromProgramOptions(programOptions);
+                       return 
ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
                } catch (CliArgsException e) {
                        throw new SqlExecutionException("Invalid deployment run 
options.", e);
                }
@@ -487,7 +485,7 @@ public class ExecutionContext<ClusterID> {
                                        flinkConfig,
                                        parallelism);
 
-                       jobGraph.addJars(dependencies);
+                       jobGraph.addJars(executionParameters.getJars());
                        
jobGraph.setClasspaths(executionParameters.getClasspaths());
                        
jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());
 

Reply via email to