Repository: flink Updated Branches: refs/heads/release-1.1 b9e6dcc3c -> 3b5d3c6f3
Revert "[FLINK-4913][yarn] include user jars in system class loader" This reverts commit ea41b9c56fdc0af3c97d6dd48d04218db6176ec8. This closes #2795 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5d3c6f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5d3c6f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5d3c6f Branch: refs/heads/release-1.1 Commit: 3b5d3c6f359dbbdfebcf0b7c034264a3ed9ad12c Parents: b9e6dcc Author: Ufuk Celebi <[email protected]> Authored: Sat Nov 12 20:49:17 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Fri Nov 25 16:14:52 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 14 ++---- .../flink/client/cli/CustomCommandLine.java | 14 +----- .../org/apache/flink/client/cli/DefaultCLI.java | 5 +- .../flink/client/program/ClusterClient.java | 40 +++++----------- .../flink/client/program/PackagedProgram.java | 46 ++++++------------- .../client/program/StandaloneClusterClient.java | 6 --- .../org/apache/flink/api/scala/FlinkShell.scala | 7 +-- ...CliFrontendYarnAddressConfigurationTest.java | 5 +- .../org/apache/flink/yarn/YarnTestBase.java | 5 +- .../yarn/AbstractYarnClusterDescriptor.java | 48 +------------------- .../apache/flink/yarn/YarnClusterClient.java | 6 --- .../flink/yarn/cli/FlinkYarnSessionCli.java | 10 +--- 12 files changed, 40 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 3a322dc..69963fe 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -234,7 +234,7 @@ public class CliFrontend { ClusterClient client = null; try { - client = createClient(options, program); + client = createClient(options, program.getMainClassName()); client.setPrintStatusDuringExecution(options.getStdoutLogging()); client.setDetached(options.getDetachedMode()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); @@ -871,12 +871,12 @@ public class CliFrontend { /** * Creates a {@link ClusterClient} object from the given command line options and other parameters. * @param options Command line options - * @param program The program for which to create the client. + * @param programName Program name * @throws Exception */ protected ClusterClient createClient( CommandLineOptions options, - PackagedProgram program) throws Exception { + String programName) throws Exception { // Get the custom command-line (e.g. Standalone/Yarn/Mesos) CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); @@ -887,12 +887,8 @@ public class CliFrontend { logAndSysout("Cluster configuration: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e) { try { - String applicationName = "Flink Application: " + program.getMainClassName(); - client = activeCommandLine.createCluster( - applicationName, - options.getCommandLine(), - config, - program.getAllLibraries()); + String applicationName = "Flink Application: " + programName; + client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config); logAndSysout("Cluster started: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e2) { throw new IllegalConfigurationException( http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java index c58c74c..aecdc7c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java @@ -22,9 +22,6 @@ import org.apache.commons.cli.Options; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; -import java.net.URL; -import java.util.List; - /** * Custom command-line interface to load hooks for the command-line interface. @@ -64,22 +61,15 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> { * @return Client if a cluster could be retrieved * @throws UnsupportedOperationException if the operation is not supported */ - ClusterType retrieveCluster( - CommandLine commandLine, - Configuration config) throws UnsupportedOperationException; + ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException; /** * Creates the client for the cluster * @param applicationName The application name to use * @param commandLine The command-line options parsed by the CliFrontend * @param config The Flink config to use - * @param userJarFiles User jar files to include in the classpath of the cluster. * @return The client to communicate with the cluster which the CustomCommandLine brought up. * @throws UnsupportedOperationException if the operation is not supported */ - ClusterType createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - List<URL> userJarFiles) throws UnsupportedOperationException; + ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index 598c612..5f83c3d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -26,8 +26,6 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import java.net.InetSocketAddress; -import java.net.URL; -import java.util.List; import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig; @@ -77,8 +75,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> { public StandaloneClusterClient createCluster( String applicationName, CommandLine commandLine, - Configuration config, - List<URL> userJarFiles) throws UnsupportedOperationException { + Configuration config) throws UnsupportedOperationException { StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); return descriptor.deploy(); http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 5e88af6..2d743fa 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -307,27 +307,11 @@ public abstract class ClusterClient { { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { - - final JobWithJars jobWithJars; - if (hasUserJarsInClassPath(prog.getAllLibraries())) { - jobWithJars = prog.getPlanWithoutJars(); - } else { - jobWithJars = prog.getPlanWithJars(); - } - - return run(jobWithJars, parallelism, prog.getSavepointSettings()); + return run(prog.getPlanWithJars(), parallelism, prog.getSavepointSettings()); } else if (prog.isUsingInteractiveMode()) { LOG.info("Starting program in interactive mode"); - - final List<URL> libraries; - if (hasUserJarsInClassPath(prog.getAllLibraries())) { - libraries = Collections.emptyList(); - } else { - libraries = prog.getAllLibraries(); - } - - ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, + ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(), prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), prog.getSavepointSettings()); ContextEnvironment.setAsContext(factory); @@ -365,7 +349,7 @@ public abstract class ClusterClient { * Runs a program on the Flink cluster to which this client is connected. The call blocks until the * execution is complete, and returns afterwards. * - * @param jobWithJars The program to be executed. + * @param program The program to be executed. * @param parallelism The default parallelism to use when running the program. The default parallelism is used * when the program does not set a parallelism by itself. * @@ -375,15 +359,15 @@ public abstract class ClusterClient { * i.e. the job-manager is unreachable, or due to the fact that the * parallel execution failed. */ - public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) + public JobSubmissionResult run(JobWithJars program, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException { - ClassLoader classLoader = jobWithJars.getUserCodeClassLoader(); + ClassLoader classLoader = program.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } - OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism); - return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings); + OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism); + return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointSettings); } public JobSubmissionResult run( @@ -614,6 +598,10 @@ public abstract class ClusterClient { return getOptimizedPlan(compiler, prog.getPlan(), parallelism); } + public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { + return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null); + } + public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings); } @@ -740,12 +728,6 @@ public abstract class ClusterClient { public abstract int getMaxSlots(); /** - * Returns true if the client already has the user jar and providing it again would - * result in duplicate uploading of the jar. - */ - public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles); - - /** * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform * some custom job submission logic. * @param jobGraph The JobGraph to be submitted http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 8931a3e..daa5737 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -283,38 +283,23 @@ public class PackagedProgram { } /** - * Returns the plan without the required jars when the files are already provided by the cluster. - * - * @return The plan without attached jar files. - * @throws ProgramInvocationException - */ - public JobWithJars getPlanWithoutJars() throws ProgramInvocationException { - if (isUsingProgramEntryPoint()) { - return new JobWithJars(getPlan(), Collections.<URL>emptyList(), classpaths, userCodeClassLoader); - } else { - throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + - " for a program that is using the interactive mode."); - } - } - - /** * Returns the plan with all required jars. - * + * * @return The plan with attached jar files. - * @throws ProgramInvocationException + * @throws ProgramInvocationException */ public JobWithJars getPlanWithJars() throws ProgramInvocationException { if (isUsingProgramEntryPoint()) { return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader); } else { - throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + + throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + " for a program that is using the interactive mode."); } } /** * Returns the analyzed plan without any optimizations. - * + * * @return * the analyzed plan without any optimizations. * @throws ProgramInvocationException Thrown if an error occurred in the @@ -324,7 +309,7 @@ public class PackagedProgram { public String getPreviewPlan() throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader()); List<DataSinkNode> previewPlan; - + if (isUsingProgramEntryPoint()) { previewPlan = Optimizer.createPreOptimizedPlan(getPlan()); } @@ -351,7 +336,7 @@ public class PackagedProgram { finally { env.unsetAsContext(); } - + if (env.previewPlan != null) { previewPlan = env.previewPlan; } else { @@ -375,7 +360,7 @@ public class PackagedProgram { /** * Returns the description provided by the Program class. This * may contain a description of the plan itself and its arguments. - * + * * @return The description of the PactProgram's input parameters. * @throws ProgramInvocationException * This invocation is thrown if the Program can't be properly loaded. Causes @@ -383,7 +368,7 @@ public class PackagedProgram { */ public String getDescription() throws ProgramInvocationException { if (ProgramDescription.class.isAssignableFrom(this.mainClass)) { - + ProgramDescription descr; if (this.program != null) { descr = (ProgramDescription) this.program; @@ -395,22 +380,22 @@ public class PackagedProgram { return null; } } - + try { return descr.getDescription(); } catch (Throwable t) { - throw new ProgramInvocationException("Error while getting the program description" + + throw new ProgramInvocationException("Error while getting the program description" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } - + } else { return null; } } - + /** - * + * * This method assumes that the context environment is prepared, or the execution * will be a local execution by default. */ @@ -433,16 +418,13 @@ public class PackagedProgram { /** * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes. - * + * * @return The user code ClassLoader. */ public ClassLoader getUserCodeClassLoader() { return this.userCodeClassLoader; } - /** - * Returns all provided libraries needed to run the program. - */ public List<URL> getAllLibraries() { List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1); http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 296ddc9..3343b69 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -28,7 +28,6 @@ import scala.concurrent.Await; import scala.concurrent.Future; import java.io.IOException; -import java.net.URL; import java.util.Collections; import java.util.List; @@ -88,11 +87,6 @@ public class StandaloneClusterClient extends ClusterClient { } @Override - public boolean hasUserJarsInClassPath(List<URL> userJarFiles) { - return false; - } - - @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index f00013e..fb70280 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -19,7 +19,6 @@ package org.apache.flink.api.scala import java.io._ -import java.util.Collections import org.apache.commons.cli.CommandLine import org.apache.flink.client.cli.CliFrontendParser @@ -253,11 +252,7 @@ object FlinkShell { val config = frontend.getConfiguration val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine) - val cluster = customCLI.createCluster( - "Flink Scala Shell", - options.getCommandLine, - config, - Collections.emptyList()) + val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config) val address = cluster.getJobManagerAddress.getAddress.getHostAddress val port = cluster.getJobManagerAddress.getPort http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 77d3149..8ba786f 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -25,7 +25,6 @@ import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -332,8 +331,8 @@ public class CliFrontendYarnAddressConfigurationTest { @Override // make method public - public ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception { - return super.createClient(options, program); + public ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { + return super.createClient(options, programName); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 78e16ed..7e612c4 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; @@ -668,9 +667,9 @@ public abstract class YarnTestBase extends TestLogger { public TestingCLI() throws Exception {} @Override - protected ClusterClient createClient(CommandLineOptions options, PackagedProgram program) throws Exception { + protected ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { // mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on - originalClusterClient = super.createClient(options, program); + originalClusterClient = super.createClient(options, programName); spiedClusterClient = Mockito.spy(originalClusterClient); Mockito.doNothing().when(spiedClusterClient).shutdown(); return spiedClusterClient; http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 000b2c1..ab1fbc1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -61,8 +61,6 @@ import java.io.PrintStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.security.PrivilegedExceptionAction; -import java.net.URISyntaxException; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -132,10 +130,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor private String zookeeperNamespace; - /** Optional Jar file to include in the system class loader of all application nodes - * (for per-job submission) */ - private Set<File> userJarFiles; - public AbstractYarnClusterDescriptor() { // for unit tests only if(System.getenv("IN_TESTS") != null) { @@ -246,41 +240,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor this.dynamicPropertiesEncoded = dynamicPropertiesEncoded; } - /** - * Returns true if the descriptor has the job jars to include in the classpath. - */ - public boolean hasUserJarFiles(List<URL> requiredJarFiles) { - if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) { - return false; - } - try { - for(URL jarFile : requiredJarFiles) { - if (!userJarFiles.contains(new File(jarFile.toURI()))) { - return false; - } - } - } catch (URISyntaxException e) { - return false; - } - return true; - } - - /** - * Sets the user jar which is included in the system classloader of all nodes. - */ - public void setProvidedUserJarFiles(List<URL> userJarFiles) { - Set<File> localUserJarFiles = new HashSet<>(userJarFiles.size()); - for (URL jarFile : userJarFiles) { - try { - localUserJarFiles.add(new File(jarFile.toURI())); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile - + " Currently only file:/// URLs are supported."); - } - } - this.userJarFiles = localUserJarFiles; - } - public String getDynamicPropertiesEncoded() { return this.dynamicPropertiesEncoded; } @@ -596,11 +555,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j); - // add the user jar to the classpath of the to-be-created cluster - if (userJarFiles != null) { - effectiveShipFiles.addAll(userJarFiles); - } - // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); @@ -755,7 +709,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor try { report = yarnClient.getApplicationReport(appId); } catch (IOException e) { - throw new YarnDeploymentException("Failed to deploy the cluster.", e); + throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage()); } YarnApplicationState appState = report.getYarnApplicationState(); switch(appState) { http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index cd447d7..79501b1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -55,7 +55,6 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; -import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -195,11 +194,6 @@ public class YarnClusterClient extends ClusterClient { } @Override - public boolean hasUserJarsInClassPath(List<URL> userJarFiles) { - return clusterDescriptor.hasUserJarFiles(userJarFiles); - } - - @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { if (newlyCreatedCluster) { http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 4823d35..28d8fb8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -29,7 +29,6 @@ import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; @@ -49,7 +48,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UnsupportedEncodingException; -import java.net.URL; import java.net.URLDecoder; import java.nio.charset.Charset; import java.util.ArrayList; @@ -517,16 +515,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } @Override - public YarnClusterClient createCluster( - String applicationName, - CommandLine cmdLine, - Configuration config, - List<URL> userJarFiles) { - Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); + public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) { AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); yarnClusterDescriptor.setFlinkConfiguration(config); - yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles); try { return yarnClusterDescriptor.deploy();
