Repository: flink Updated Branches: refs/heads/master fc730bb07 -> b410c393c
[FLINK-4913][yarn] include user jars in system class loader When deploying a Yarn cluster for a single job, this change pre-configures the cluster to include the user jar(s) on all nodes. This eliminates the need to upload jar files through the BlobClient. More importantly, it loads the user classes only once and not on every instantiation of a Task. This also reduces the JobManager class loading upon recovery of a failed job. This closes #2692. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b600d35 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b600d35 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b600d35 Branch: refs/heads/master Commit: 2b600d355f5df9364c634282469acd608d7a2104 Parents: fc730bb Author: Maximilian Michels <[email protected]> Authored: Mon Oct 24 16:16:52 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Thu Oct 27 14:21:42 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 14 ++++-- .../flink/client/cli/CustomCommandLine.java | 14 +++++- .../org/apache/flink/client/cli/DefaultCLI.java | 5 +- .../flink/client/program/ClusterClient.java | 40 +++++++++++----- .../flink/client/program/PackagedProgram.java | 46 +++++++++++++------ .../client/program/StandaloneClusterClient.java | 6 +++ .../org/apache/flink/api/scala/FlinkShell.scala | 7 ++- ...CliFrontendYarnAddressConfigurationTest.java | 5 +- .../org/apache/flink/yarn/YarnTestBase.java | 5 +- .../yarn/AbstractYarnClusterDescriptor.java | 48 +++++++++++++++++++- .../apache/flink/yarn/YarnClusterClient.java | 6 +++ .../flink/yarn/cli/FlinkYarnSessionCli.java | 10 +++- 12 files changed, 166 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 236ee94..5db4449 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -236,7 +236,7 @@ public class CliFrontend { ClusterClient client = null; try { - client = createClient(options, program.getMainClassName()); + client = createClient(options, program); client.setPrintStatusDuringExecution(options.getStdoutLogging()); client.setDetached(options.getDetachedMode()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); @@ -928,12 +928,12 @@ public class CliFrontend { /** * Creates a {@link ClusterClient} object from the given command line options and other parameters. * @param options Command line options - * @param programName Program name + * @param program The program for which to create the client. * @throws Exception */ protected ClusterClient createClient( CommandLineOptions options, - String programName) throws Exception { + PackagedProgram program) throws Exception { // Get the custom command-line (e.g. Standalone/Yarn/Mesos) CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); @@ -944,8 +944,12 @@ public class CliFrontend { logAndSysout("Cluster configuration: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e) { try { - String applicationName = "Flink Application: " + programName; - client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config); + String applicationName = "Flink Application: " + program.getMainClassName(); + client = activeCommandLine.createCluster( + applicationName, + options.getCommandLine(), + config, + program.getAllLibraries()); logAndSysout("Cluster started: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e2) { throw new IllegalConfigurationException( http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java index aecdc7c..c58c74c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java @@ -22,6 +22,9 @@ import org.apache.commons.cli.Options; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import java.net.URL; +import java.util.List; + /** * Custom command-line interface to load hooks for the command-line interface. @@ -61,15 +64,22 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> { * @return Client if a cluster could be retrieved * @throws UnsupportedOperationException if the operation is not supported */ - ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException; + ClusterType retrieveCluster( + CommandLine commandLine, + Configuration config) throws UnsupportedOperationException; /** * Creates the client for the cluster * @param applicationName The application name to use * @param commandLine The command-line options parsed by the CliFrontend * @param config The Flink config to use + * @param userJarFiles User jar files to include in the classpath of the cluster. * @return The client to communicate with the cluster which the CustomCommandLine brought up. * @throws UnsupportedOperationException if the operation is not supported */ - ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException; + ClusterType createCluster( + String applicationName, + CommandLine commandLine, + Configuration config, + List<URL> userJarFiles) throws UnsupportedOperationException; } http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index 8f79403..41737d0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -27,6 +27,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import java.net.InetSocketAddress; +import java.net.URL; +import java.util.List; import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig; @@ -76,7 +78,8 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> { public StandaloneClusterClient createCluster( String applicationName, CommandLine commandLine, - Configuration config) throws UnsupportedOperationException { + Configuration config, + List<URL> userJarFiles) throws UnsupportedOperationException { StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); return descriptor.deploy(); http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index ff5701f..7ffe7a1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -308,11 +308,27 @@ public abstract class ClusterClient { { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { - return run(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()); + + final JobWithJars jobWithJars; + if (hasUserJarsInClassPath(prog.getAllLibraries())) { + jobWithJars = prog.getPlanWithoutJars(); + } else { + jobWithJars = prog.getPlanWithJars(); + } + + return run(jobWithJars, parallelism, prog.getSavepointPath()); } else if (prog.isUsingInteractiveMode()) { LOG.info("Starting program in interactive mode"); - ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(), + + final List<URL> libraries; + if (hasUserJarsInClassPath(prog.getAllLibraries())) { + libraries = Collections.emptyList(); + } else { + libraries = prog.getAllLibraries(); + } + + ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), prog.getSavepointPath()); ContextEnvironment.setAsContext(factory); @@ -349,7 +365,7 @@ public abstract class ClusterClient { * Runs a program on the Flink cluster to which this client is connected. The call blocks until the * execution is complete, and returns afterwards. * - * @param program The program to be executed. + * @param jobWithJars The program to be executed. * @param parallelism The default parallelism to use when running the program. The default parallelism is used * when the program does not set a parallelism by itself. * @@ -359,15 +375,15 @@ public abstract class ClusterClient { * i.e. the job-manager is unreachable, or due to the fact that the * parallel execution failed. */ - public JobSubmissionResult run(JobWithJars program, int parallelism, String savepointPath) + public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, String savepointPath) throws CompilerException, ProgramInvocationException { - ClassLoader classLoader = program.getUserCodeClassLoader(); + ClassLoader classLoader = jobWithJars.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } - OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism); - return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath); + OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism); + return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointPath); } public JobSubmissionResult run( @@ -631,10 +647,6 @@ public abstract class ClusterClient { return getOptimizedPlan(compiler, prog.getPlan(), parallelism); } - public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { - return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null); - } - public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException { return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath); } @@ -761,6 +773,12 @@ public abstract class ClusterClient { public abstract int getMaxSlots(); /** + * Returns true if the client already has the user jar and providing it again would + * result in duplicate uploading of the jar. + */ + public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles); + + /** * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform * some custom job submission logic. * @param jobGraph The JobGraph to be submitted http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index aca873e..99f57bb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -282,23 +282,38 @@ public class PackagedProgram { } /** + * Returns the plan without the required jars when the files are already provided by the cluster. + * + * @return The plan without attached jar files. + * @throws ProgramInvocationException + */ + public JobWithJars getPlanWithoutJars() throws ProgramInvocationException { + if (isUsingProgramEntryPoint()) { + return new JobWithJars(getPlan(), Collections.<URL>emptyList(), classpaths, userCodeClassLoader); + } else { + throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + + " for a program that is using the interactive mode."); + } + } + + /** * Returns the plan with all required jars. - * + * * @return The plan with attached jar files. - * @throws ProgramInvocationException + * @throws ProgramInvocationException */ public JobWithJars getPlanWithJars() throws ProgramInvocationException { if (isUsingProgramEntryPoint()) { return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader); } else { - throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + + throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + " for a program that is using the interactive mode."); } } /** * Returns the analyzed plan without any optimizations. - * + * * @return * the analyzed plan without any optimizations. * @throws ProgramInvocationException Thrown if an error occurred in the @@ -308,7 +323,7 @@ public class PackagedProgram { public String getPreviewPlan() throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader()); List<DataSinkNode> previewPlan; - + if (isUsingProgramEntryPoint()) { previewPlan = Optimizer.createPreOptimizedPlan(getPlan()); } @@ -335,7 +350,7 @@ public class PackagedProgram { finally { env.unsetAsContext(); } - + if (env.previewPlan != null) { previewPlan = env.previewPlan; } else { @@ -359,7 +374,7 @@ public class PackagedProgram { /** * Returns the description provided by the Program class. This * may contain a description of the plan itself and its arguments. - * + * * @return The description of the PactProgram's input parameters. * @throws ProgramInvocationException * This invocation is thrown if the Program can't be properly loaded. Causes @@ -367,7 +382,7 @@ public class PackagedProgram { */ public String getDescription() throws ProgramInvocationException { if (ProgramDescription.class.isAssignableFrom(this.mainClass)) { - + ProgramDescription descr; if (this.program != null) { descr = (ProgramDescription) this.program; @@ -379,22 +394,22 @@ public class PackagedProgram { return null; } } - + try { return descr.getDescription(); } catch (Throwable t) { - throw new ProgramInvocationException("Error while getting the program description" + + throw new ProgramInvocationException("Error while getting the program description" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } - + } else { return null; } } - + /** - * + * * This method assumes that the context environment is prepared, or the execution * will be a local execution by default. */ @@ -417,13 +432,16 @@ public class PackagedProgram { /** * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes. - * + * * @return The user code ClassLoader. */ public ClassLoader getUserCodeClassLoader() { return this.userCodeClassLoader; } + /** + * Returns all provided libraries needed to run the program. + */ public List<URL> getAllLibraries() { List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1); http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 3343b69..296ddc9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -28,6 +28,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import java.io.IOException; +import java.net.URL; import java.util.Collections; import java.util.List; @@ -87,6 +88,11 @@ public class StandaloneClusterClient extends ClusterClient { } @Override + public boolean hasUserJarsInClassPath(List<URL> userJarFiles) { + return false; + } + + @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 2f5cc47..3499b9e 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.scala import java.io._ +import java.util.Collections import org.apache.commons.cli.CommandLine import org.apache.flink.client.cli.CliFrontendParser @@ -252,7 +253,11 @@ object FlinkShell { val config = frontend.getConfiguration val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) - val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config) + val cluster = customCLI.createCluster( + "Flink Scala Shell", + options.getCommandLine, + config, + Collections.emptyList()) val address = cluster.getJobManagerAddress.getAddress.getHostAddress val port = cluster.getJobManagerAddress.getPort http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 4bcde16..6a8c266 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -26,6 +26,7 @@ import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -324,8 +325,8 @@ public class CliFrontendYarnAddressConfigurationTest { @Override // make method public - public ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { - return super.createClient(options, programName); + public ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception { + return super.createClient(options, program); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index dba87de..dc7cca3 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; @@ -744,9 +745,9 @@ public abstract class YarnTestBase extends TestLogger { public TestingCLI() throws Exception {} @Override - protected ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { + protected ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception { // mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on - originalClusterClient = super.createClient(options, programName); + originalClusterClient = super.createClient(options, program); spiedClusterClient = Mockito.spy(originalClusterClient); Mockito.doNothing().when(spiedClusterClient).shutdown(); return spiedClusterClient; http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 9481c24..55bc387 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -61,6 +61,8 @@ import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -128,6 +130,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor private String zookeeperNamespace; + /** Optional Jar file to include in the system class loader of all application nodes + * (for per-job submission) */ + private Set<File> userJarFiles; + public AbstractYarnClusterDescriptor() { // for unit tests only if(System.getenv("IN_TESTS") != null) { @@ -237,6 +243,41 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor this.dynamicPropertiesEncoded = dynamicPropertiesEncoded; } + /** + * Returns true if the descriptor has the job jars to include in the classpath. + */ + public boolean hasUserJarFiles(List<URL> requiredJarFiles) { + if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) { + return false; + } + try { + for(URL jarFile : requiredJarFiles) { + if (!userJarFiles.contains(new File(jarFile.toURI()))) { + return false; + } + } + } catch (URISyntaxException e) { + return false; + } + return true; + } + + /** + * Sets the user jar which is included in the system classloader of all nodes. + */ + public void setProvidedUserJarFiles(List<URL> userJarFiles) { + Set<File> localUserJarFiles = new HashSet<>(userJarFiles.size()); + for (URL jarFile : userJarFiles) { + try { + localUserJarFiles.add(new File(jarFile.toURI())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile + + " Currently only file:/// URLs are supported."); + } + } + this.userJarFiles = localUserJarFiles; + } + public String getDynamicPropertiesEncoded() { return this.dynamicPropertiesEncoded; } @@ -530,6 +571,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor addLibFolderToShipFiles(effectiveShipFiles); + // add the user jar to the classpath of the to-be-created cluster + if (userJarFiles != null) { + effectiveShipFiles.addAll(userJarFiles); + } + // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); @@ -743,7 +789,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor try { report = yarnClient.getApplicationReport(appId); } catch (IOException e) { - throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage()); + throw new YarnDeploymentException("Failed to deploy the cluster.", e); } YarnApplicationState appState = report.getYarnApplicationState(); LOG.debug("Application State: {}", appState); http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index 8b6cd9a..e620f21 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -55,6 +55,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -194,6 +195,11 @@ public class YarnClusterClient extends ClusterClient { } @Override + public boolean hasUserJarsInClassPath(List<URL> userJarFiles) { + return clusterDescriptor.hasUserJarFiles(userJarFiles); + } + + @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { if (newlyCreatedCluster) { http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 7ce040b..1d10bd9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -34,6 +34,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.yarn.YarnClusterDescriptor; @@ -51,6 +52,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.net.URL; import java.net.URLDecoder; import java.nio.charset.Charset; import java.util.ArrayList; @@ -526,10 +528,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } @Override - public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) { + public YarnClusterClient createCluster( + String applicationName, + CommandLine cmdLine, + Configuration config, + List<URL> userJarFiles) { + Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); yarnClusterDescriptor.setFlinkConfiguration(config); + yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles); try { return yarnClusterDescriptor.deploy();
