This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd8570f8551c78608b847109c0a61f8a46c6cb65 Author: Kostas Kloudas <[email protected]> AuthorDate: Wed Nov 13 15:09:03 2019 +0100 [FLINK-14745] Add dependencies of job as list of URLs in config --- .../src/main/java/org/apache/flink/client/ClientUtils.java | 2 +- .../src/main/java/org/apache/flink/client/cli/CliFrontend.java | 3 ++- .../org/apache/flink/client/cli/ExecutionConfigAccessor.java | 9 ++++++++- .../java/org/apache/flink/client/program/PackagedProgram.java | 4 ++-- .../org/apache/flink/client/program/PackagedProgramUtils.java | 2 +- .../java/org/apache/flink/client/cli/CliFrontendRunTest.java | 4 ++-- .../flink/table/client/gateway/local/ExecutionContext.java | 10 ++++------ 7 files changed, 20 insertions(+), 14 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index f756449..043b740 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -147,7 +147,7 @@ public enum ClientUtils { ContextEnvironmentFactory factory = new ContextEnvironmentFactory( client, - program.getAllLibraries(), + program.getJobJarAndDependencies(), program.getClasspaths(), program.getUserCodeClassLoader(), parallelism, diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 6e9b2f9..9ff6d53 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -211,7 +211,8 @@ public class CliFrontend { final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions); + final List<URL> jobJars = program.getJobJarAndDependencies(); + final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars); final Configuration executionConfig = executionParameters.getConfiguration(); try { diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index ee32449..ec627ac 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -55,15 +55,18 @@ public class ExecutionConfigAccessor { /** * Creates an {@link ExecutionConfigAccessor} based on the provided {@link ProgramOptions} as provided by the user through the CLI. */ - public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options) { + public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options, final List<URL> jobJars) { checkNotNull(options); + checkNotNull(jobJars); final Configuration configuration = new Configuration(); + configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, options.getParallelism()); configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode()); configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit()); ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jobJars, URL::toString); SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration); @@ -74,6 +77,10 @@ public class ExecutionConfigAccessor { return configuration; } + public List<URL> getJars() { + return decodeUrlList(configuration, PipelineOptions.JARS); + } + public List<URL> getClasspaths() { return decodeUrlList(configuration, PipelineOptions.CLASSPATHS); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 2b593ca..1d96646 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -140,7 +140,7 @@ public class PackagedProgram { // now that we have an entry point, we can extract the nested jar files (if any) this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl); this.classpaths = classpaths; - this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader()); + this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(getJobJarAndDependencies(), classpaths, getClass().getClassLoader()); // load the entry point class this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader); @@ -227,7 +227,7 @@ public class PackagedProgram { /** * Returns all provided libraries needed to run the program. */ - public List<URL> getAllLibraries() { + public List<URL> getJobJarAndDependencies() { List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1); if (jarFile != null) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 00ac231..edf3617 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -63,7 +63,7 @@ public class PackagedProgramUtils { if (jobID != null) { jobGraph.setJobID(jobID); } - jobGraph.addJars(packagedProgram.getAllLibraries()); + jobGraph.addJars(packagedProgram.getJobJarAndDependencies()); jobGraph.setClasspaths(packagedProgram.getClasspaths()); jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index d2aff28..449e1b2 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -85,7 +85,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); + ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList()); SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings(); assertTrue(savepointSettings.restoreSavepoint()); @@ -99,7 +99,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); + ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList()); SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings(); assertTrue(savepointSettings.restoreSavepoint()); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index e37635f..99c43a8 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -113,7 +113,6 @@ public class ExecutionContext<ClusterID> { private final SessionContext sessionContext; private final Environment mergedEnv; - private final List<URL> dependencies; private final ClassLoader classLoader; private final Map<String, Module> modules; private final Map<String, Catalog> catalogs; @@ -136,7 +135,6 @@ public class ExecutionContext<ClusterID> { Configuration flinkConfig, ClusterClientServiceLoader clusterClientServiceLoader, Options commandLineOptions, List<CustomCommandLine> availableCommandLines) throws FlinkException { this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment()); - this.dependencies = dependencies; this.flinkConfig = flinkConfig; // create class loader @@ -184,7 +182,7 @@ public class ExecutionContext<ClusterID> { clusterClientFactory = serviceLoader.getClusterClientFactory(executorConfig); checkState(clusterClientFactory != null); - executionParameters = createExecutionParameterProvider(commandLine); + executionParameters = createExecutionParameterProvider(commandLine, dependencies); clusterId = clusterClientFactory.getClusterId(executorConfig); clusterSpec = clusterClientFactory.getClusterSpecification(executorConfig); } @@ -262,10 +260,10 @@ public class ExecutionContext<ClusterID> { throw new SqlExecutionException("Could not find a matching deployment."); } - private static ExecutionConfigAccessor createExecutionParameterProvider(CommandLine commandLine) { + private static ExecutionConfigAccessor createExecutionParameterProvider(CommandLine commandLine, List<URL> jobJars) { try { final ProgramOptions programOptions = new ProgramOptions(commandLine); - return ExecutionConfigAccessor.fromProgramOptions(programOptions); + return ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars); } catch (CliArgsException e) { throw new SqlExecutionException("Invalid deployment run options.", e); } @@ -487,7 +485,7 @@ public class ExecutionContext<ClusterID> { flinkConfig, parallelism); - jobGraph.addJars(dependencies); + jobGraph.addJars(executionParameters.getJars()); jobGraph.setClasspaths(executionParameters.getClasspaths()); jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());
