Repository: flink
Updated Branches:
  refs/heads/master fc730bb07 -> b410c393c


[FLINK-4913][yarn] include user jars in system class loader

When deploying a Yarn cluster for a single job, this change
pre-configures the cluster to include the user jar(s) on all nodes.
This eliminates the need to upload jar files through the
BlobClient. More importantly, it loads the user classes only once and
not on every instantiation of a Task. This also reduces the JobManager
class loading upon recovery of a failed job.

This closes #2692.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b600d35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b600d35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b600d35

Branch: refs/heads/master
Commit: 2b600d355f5df9364c634282469acd608d7a2104
Parents: fc730bb
Author: Maximilian Michels <[email protected]>
Authored: Mon Oct 24 16:16:52 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Thu Oct 27 14:21:42 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 14 ++++--
 .../flink/client/cli/CustomCommandLine.java     | 14 +++++-
 .../org/apache/flink/client/cli/DefaultCLI.java |  5 +-
 .../flink/client/program/ClusterClient.java     | 40 +++++++++++-----
 .../flink/client/program/PackagedProgram.java   | 46 +++++++++++++------
 .../client/program/StandaloneClusterClient.java |  6 +++
 .../org/apache/flink/api/scala/FlinkShell.scala |  7 ++-
 ...CliFrontendYarnAddressConfigurationTest.java |  5 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |  5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 48 +++++++++++++++++++-
 .../apache/flink/yarn/YarnClusterClient.java    |  6 +++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 10 +++-
 12 files changed, 166 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 236ee94..5db4449 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -236,7 +236,7 @@ public class CliFrontend {
                ClusterClient client = null;
                try {
 
-                       client = createClient(options, 
program.getMainClassName());
+                       client = createClient(options, program);
                        
client.setPrintStatusDuringExecution(options.getStdoutLogging());
                        client.setDetached(options.getDetachedMode());
                        LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
@@ -928,12 +928,12 @@ public class CliFrontend {
        /**
         * Creates a {@link ClusterClient} object from the given command line 
options and other parameters.
         * @param options Command line options
-        * @param programName Program name
+        * @param program The program for which to create the client.
         * @throws Exception
         */
        protected ClusterClient createClient(
                        CommandLineOptions options,
-                       String programName) throws Exception {
+                       PackagedProgram program) throws Exception {
 
                // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
                CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
@@ -944,8 +944,12 @@ public class CliFrontend {
                        logAndSysout("Cluster configuration: " + 
client.getClusterIdentifier());
                } catch (UnsupportedOperationException e) {
                        try {
-                               String applicationName = "Flink Application: " 
+ programName;
-                               client = 
activeCommandLine.createCluster(applicationName, options.getCommandLine(), 
config);
+                               String applicationName = "Flink Application: " 
+ program.getMainClassName();
+                               client = activeCommandLine.createCluster(
+                                       applicationName,
+                                       options.getCommandLine(),
+                                       config,
+                                       program.getAllLibraries());
                                logAndSysout("Cluster started: " + 
client.getClusterIdentifier());
                        } catch (UnsupportedOperationException e2) {
                                throw new IllegalConfigurationException(

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index aecdc7c..c58c74c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -22,6 +22,9 @@ import org.apache.commons.cli.Options;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 
+import java.net.URL;
+import java.util.List;
+
 
 /**
  * Custom command-line interface to load hooks for the command-line interface.
@@ -61,15 +64,22 @@ public interface CustomCommandLine<ClusterType extends 
ClusterClient> {
         * @return Client if a cluster could be retrieved
         * @throws UnsupportedOperationException if the operation is not 
supported
         */
-       ClusterType retrieveCluster(CommandLine commandLine, Configuration 
config) throws UnsupportedOperationException;
+       ClusterType retrieveCluster(
+                       CommandLine commandLine,
+                       Configuration config) throws 
UnsupportedOperationException;
 
        /**
         * Creates the client for the cluster
         * @param applicationName The application name to use
         * @param commandLine The command-line options parsed by the CliFrontend
         * @param config The Flink config to use
+        * @param userJarFiles User jar files to include in the classpath of 
the cluster.
         * @return The client to communicate with the cluster which the 
CustomCommandLine brought up.
         * @throws UnsupportedOperationException if the operation is not 
supported
         */
-       ClusterType createCluster(String applicationName, CommandLine 
commandLine, Configuration config) throws UnsupportedOperationException;
+       ClusterType createCluster(
+                       String applicationName,
+                       CommandLine commandLine,
+                       Configuration config,
+                       List<URL> userJarFiles) throws 
UnsupportedOperationException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 8f79403..41737d0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 
 import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
 
 import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
 
@@ -76,7 +78,8 @@ public class DefaultCLI implements 
CustomCommandLine<StandaloneClusterClient> {
        public StandaloneClusterClient createCluster(
                        String applicationName,
                        CommandLine commandLine,
-                       Configuration config) throws 
UnsupportedOperationException {
+                       Configuration config,
+                       List<URL> userJarFiles) throws 
UnsupportedOperationException {
 
                StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
                return descriptor.deploy();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index ff5701f..7ffe7a1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -308,11 +308,27 @@ public abstract class ClusterClient {
        {
                
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
                if (prog.isUsingProgramEntryPoint()) {
-                       return run(prog.getPlanWithJars(), parallelism, 
prog.getSavepointPath());
+
+                       final JobWithJars jobWithJars;
+                       if (hasUserJarsInClassPath(prog.getAllLibraries())) {
+                               jobWithJars = prog.getPlanWithoutJars();
+                       } else {
+                               jobWithJars = prog.getPlanWithJars();
+                       }
+
+                       return run(jobWithJars, parallelism, 
prog.getSavepointPath());
                }
                else if (prog.isUsingInteractiveMode()) {
                        LOG.info("Starting program in interactive mode");
-                       ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, prog.getAllLibraries(),
+
+                       final List<URL> libraries;
+                       if (hasUserJarsInClassPath(prog.getAllLibraries())) {
+                               libraries = Collections.emptyList();
+                       } else {
+                               libraries = prog.getAllLibraries();
+                       }
+
+                       ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
                                        prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
                                        prog.getSavepointPath());
                        ContextEnvironment.setAsContext(factory);
@@ -349,7 +365,7 @@ public abstract class ClusterClient {
         * Runs a program on the Flink cluster to which this client is 
connected. The call blocks until the
         * execution is complete, and returns afterwards.
         *
-        * @param program The program to be executed.
+        * @param jobWithJars The program to be executed.
         * @param parallelism The default parallelism to use when running the 
program. The default parallelism is used
         *                    when the program does not set a parallelism by 
itself.
         *
@@ -359,15 +375,15 @@ public abstract class ClusterClient {
         *                                    i.e. the job-manager is 
unreachable, or due to the fact that the
         *                                    parallel execution failed.
         */
-       public JobSubmissionResult run(JobWithJars program, int parallelism, 
String savepointPath)
+       public JobSubmissionResult run(JobWithJars jobWithJars, int 
parallelism, String savepointPath)
                        throws CompilerException, ProgramInvocationException {
-               ClassLoader classLoader = program.getUserCodeClassLoader();
+               ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
                if (classLoader == null) {
                        throw new IllegalArgumentException("The given 
JobWithJars does not provide a usercode class loader.");
                }
 
-               OptimizedPlan optPlan = getOptimizedPlan(compiler, program, 
parallelism);
-               return run(optPlan, program.getJarFiles(), 
program.getClasspaths(), classLoader, savepointPath);
+               OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, 
parallelism);
+               return run(optPlan, jobWithJars.getJarFiles(), 
jobWithJars.getClasspaths(), classLoader, savepointPath);
        }
 
        public JobSubmissionResult run(
@@ -631,10 +647,6 @@ public abstract class ClusterClient {
                return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
        }
 
-       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) 
throws ProgramInvocationException {
-               return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), null);
-       }
-
        public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, 
String savepointPath) throws ProgramInvocationException {
                return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), savepointPath);
        }
@@ -761,6 +773,12 @@ public abstract class ClusterClient {
        public abstract int getMaxSlots();
 
        /**
+        * Returns true if the client already has the user jar and providing it 
again would
+        * result in duplicate uploading of the jar.
+        */
+       public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
+
+       /**
         * Calls the subclasses' submitJob method. It may decide to simply call 
one of the run methods or it may perform
         * some custom job submission logic.
         * @param jobGraph The JobGraph to be submitted

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index aca873e..99f57bb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -282,23 +282,38 @@ public class PackagedProgram {
        }
 
        /**
+        * Returns the plan without the required jars when the files are 
already provided by the cluster.
+        *
+        * @return The plan without attached jar files.
+        * @throws ProgramInvocationException
+        */
+       public JobWithJars getPlanWithoutJars() throws 
ProgramInvocationException {
+               if (isUsingProgramEntryPoint()) {
+                       return new JobWithJars(getPlan(), 
Collections.<URL>emptyList(), classpaths, userCodeClassLoader);
+               } else {
+                       throw new ProgramInvocationException("Cannot create a " 
+ JobWithJars.class.getSimpleName() +
+                               " for a program that is using the interactive 
mode.");
+               }
+       }
+
+       /**
         * Returns the plan with all required jars.
-        * 
+        *
         * @return The plan with attached jar files.
-        * @throws ProgramInvocationException 
+        * @throws ProgramInvocationException
         */
        public JobWithJars getPlanWithJars() throws ProgramInvocationException {
                if (isUsingProgramEntryPoint()) {
                        return new JobWithJars(getPlan(), getAllLibraries(), 
classpaths, userCodeClassLoader);
                } else {
-                       throw new ProgramInvocationException("Cannot create a " 
+ JobWithJars.class.getSimpleName() + 
+                       throw new ProgramInvocationException("Cannot create a " 
+ JobWithJars.class.getSimpleName() +
                                        " for a program that is using the 
interactive mode.");
                }
        }
 
        /**
         * Returns the analyzed plan without any optimizations.
-        * 
+        *
         * @return
         *         the analyzed plan without any optimizations.
         * @throws ProgramInvocationException Thrown if an error occurred in the
@@ -308,7 +323,7 @@ public class PackagedProgram {
        public String getPreviewPlan() throws ProgramInvocationException {
                
Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
                List<DataSinkNode> previewPlan;
-               
+
                if (isUsingProgramEntryPoint()) {
                        previewPlan = 
Optimizer.createPreOptimizedPlan(getPlan());
                }
@@ -335,7 +350,7 @@ public class PackagedProgram {
                        finally {
                                env.unsetAsContext();
                        }
-                       
+
                        if (env.previewPlan != null) {
                                previewPlan =  env.previewPlan;
                        } else {
@@ -359,7 +374,7 @@ public class PackagedProgram {
        /**
         * Returns the description provided by the Program class. This
         * may contain a description of the plan itself and its arguments.
-        * 
+        *
         * @return The description of the PactProgram's input parameters.
         * @throws ProgramInvocationException
         *         This invocation is thrown if the Program can't be properly 
loaded. Causes
@@ -367,7 +382,7 @@ public class PackagedProgram {
         */
        public String getDescription() throws ProgramInvocationException {
                if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {
-                       
+
                        ProgramDescription descr;
                        if (this.program != null) {
                                descr = (ProgramDescription) this.program;
@@ -379,22 +394,22 @@ public class PackagedProgram {
                                        return null;
                                }
                        }
-                       
+
                        try {
                                return descr.getDescription();
                        }
                        catch (Throwable t) {
-                               throw new ProgramInvocationException("Error 
while getting the program description" + 
+                               throw new ProgramInvocationException("Error 
while getting the program description" +
                                                (t.getMessage() == null ? "." : 
": " + t.getMessage()), t);
                        }
-                       
+
                } else {
                        return null;
                }
        }
-       
+
        /**
-        * 
+        *
         * This method assumes that the context environment is prepared, or the 
execution
         * will be a local execution by default.
         */
@@ -417,13 +432,16 @@ public class PackagedProgram {
 
        /**
         * Gets the {@link java.lang.ClassLoader} that must be used to load 
user code classes.
-        * 
+        *
         * @return The user code ClassLoader.
         */
        public ClassLoader getUserCodeClassLoader() {
                return this.userCodeClassLoader;
        }
 
+       /**
+        * Returns all provided libraries needed to run the program.
+        */
        public List<URL> getAllLibraries() {
                List<URL> libs = new 
ArrayList<URL>(this.extractedTempLibraries.size() + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 3343b69..296ddc9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -28,6 +28,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
@@ -87,6 +88,11 @@ public class StandaloneClusterClient extends ClusterClient {
        }
 
        @Override
+       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+               return false;
+       }
+
+       @Override
        protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader)
                        throws ProgramInvocationException {
                if (isDetached()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 2f5cc47..3499b9e 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.scala
 
 import java.io._
+import java.util.Collections
 
 import org.apache.commons.cli.CommandLine
 import org.apache.flink.client.cli.CliFrontendParser
@@ -252,7 +253,11 @@ object FlinkShell {
     val config = frontend.getConfiguration
     val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
 
-    val cluster = customCLI.createCluster("Flink Scala Shell", 
options.getCommandLine, config)
+    val cluster = customCLI.createCluster(
+      "Flink Scala Shell",
+      options.getCommandLine,
+      config,
+      Collections.emptyList())
 
     val address = cluster.getJobManagerAddress.getAddress.getHostAddress
     val port = cluster.getJobManagerAddress.getPort

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 4bcde16..6a8c266 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -324,8 +325,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 
                @Override
                // make method public
-               public ClusterClient createClient(CommandLineOptions options, 
String programName) throws Exception {
-                       return super.createClient(options, programName);
+               public ClusterClient createClient(CommandLineOptions options, 
PackagedProgram program) throws Exception {
+                       return super.createClient(options, program);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index dba87de..dc7cca3 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -744,9 +745,9 @@ public abstract class YarnTestBase extends TestLogger {
                public TestingCLI() throws Exception {}
 
                @Override
-               protected ClusterClient createClient(CommandLineOptions 
options, String programName) throws Exception {
+               protected ClusterClient createClient(CommandLineOptions 
options, PackagedProgram program) throws Exception {
                        // mock the returned ClusterClient to disable shutdown 
and verify shutdown behavior later on
-                       originalClusterClient = super.createClient(options, 
programName);
+                       originalClusterClient = super.createClient(options, 
program);
                        spiedClusterClient = Mockito.spy(originalClusterClient);
                        Mockito.doNothing().when(spiedClusterClient).shutdown();
                        return spiedClusterClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 9481c24..55bc387 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -61,6 +61,8 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -128,6 +130,10 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        private String zookeeperNamespace;
 
+       /** Optional Jar file to include in the system class loader of all 
application nodes
+        * (for per-job submission) */
+       private Set<File> userJarFiles;
+
        public AbstractYarnClusterDescriptor() {
                // for unit tests only
                if(System.getenv("IN_TESTS") != null) {
@@ -237,6 +243,41 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
        }
 
+       /**
+        * Returns true if the descriptor has the job jars to include in the 
classpath.
+        */
+       public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
+               if (userJarFiles == null || userJarFiles.size() != 
requiredJarFiles.size()) {
+                       return false;
+               }
+               try {
+                       for(URL jarFile : requiredJarFiles) {
+                               if (!userJarFiles.contains(new 
File(jarFile.toURI()))) {
+                                       return false;
+                               }
+                       }
+               } catch (URISyntaxException e) {
+                       return false;
+               }
+               return true;
+       }
+
+       /**
+        * Sets the user jar which is included in the system classloader of all 
nodes.
+        */
+       public void setProvidedUserJarFiles(List<URL> userJarFiles) {
+               Set<File> localUserJarFiles = new 
HashSet<>(userJarFiles.size());
+               for (URL jarFile : userJarFiles) {
+                       try {
+                               localUserJarFiles.add(new 
File(jarFile.toURI()));
+                       } catch (URISyntaxException e) {
+                               throw new IllegalArgumentException("Couldn't 
add local user jar: " + jarFile
+                                       + " Currently only file:/// URLs are 
supported.");
+                       }
+               }
+               this.userJarFiles = localUserJarFiles;
+       }
+
        public String getDynamicPropertiesEncoded() {
                return this.dynamicPropertiesEncoded;
        }
@@ -530,6 +571,11 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
                addLibFolderToShipFiles(effectiveShipFiles);
 
+               // add the user jar to the classpath of the to-be-created 
cluster
+               if (userJarFiles != null) {
+                       effectiveShipFiles.addAll(userJarFiles);
+               }
+
                // Set-up ApplicationSubmissionContext for the application
                ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
@@ -743,7 +789,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        try {
                                report = yarnClient.getApplicationReport(appId);
                        } catch (IOException e) {
-                               throw new YarnDeploymentException("Failed to 
deploy the cluster: " + e.getMessage());
+                               throw new YarnDeploymentException("Failed to 
deploy the cluster.", e);
                        }
                        YarnApplicationState appState = 
report.getYarnApplicationState();
                        LOG.debug("Application State: {}", appState);

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 8b6cd9a..e620f21 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -55,6 +55,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -194,6 +195,11 @@ public class YarnClusterClient extends ClusterClient {
        }
 
        @Override
+       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+               return clusterDescriptor.hasUserJarFiles(userJarFiles);
+       }
+
+       @Override
        protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
                if (isDetached()) {
                        if (newlyCreatedCluster) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b600d35/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 7ce040b..1d10bd9 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
@@ -51,6 +52,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -526,10 +528,16 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        }
 
        @Override
-       public YarnClusterClient createCluster(String applicationName, 
CommandLine cmdLine, Configuration config) {
+       public YarnClusterClient createCluster(
+                       String applicationName,
+                       CommandLine cmdLine,
+                       Configuration config,
+                       List<URL> userJarFiles) {
+               Preconditions.checkNotNull(userJarFiles, "User jar files should 
not be null.");
 
                AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createDescriptor(applicationName, cmdLine);
                yarnClusterDescriptor.setFlinkConfiguration(config);
+               yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
                try {
                        return yarnClusterDescriptor.deploy();

Reply via email to