Repository: flink
Updated Branches:
  refs/heads/master e28380152 -> 4a314a80e


[FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath

This closes #3931


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a314a80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a314a80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a314a80

Branch: refs/heads/master
Commit: 4a314a80e64ea2af624131e5c02e1f167a22a354
Parents: e283801
Author: zentol <ches...@apache.org>
Authored: Wed May 17 18:15:27 2017 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Thu May 18 14:01:53 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 docs/setup/yarn_setup.md                        |  12 ++
 .../yarn/AbstractYarnClusterDescriptor.java     | 152 ++++++++++++-------
 .../yarn/configuration/YarnConfigOptions.java   |  18 +++
 4 files changed, 129 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index c4a7354..8a6f67d 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -457,6 +457,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` 
variable in the String
 
 - `yarn.tags` A comma-separated list of tags to apply to the Flink YARN 
application.
 
+- `yarn.per-job-cluster.include-user-jar` (Default: ORDER) Control whether and 
how the user-jar is included in the system class path for per-job clusters. 
Setting this parameter to `DISABLED` causes the jar to be included in the user 
class path instead. Setting this parameter to one of `FIRST`, `LAST` or `ORDER` 
causes the jar to be included in the system class path, with the jar either 
being placed at the beginning of the class path (`FIRST`), at the end (`LAST`), 
or based on the lexicographic order (`ORDER`).
+
 ### Mesos
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index 1ce45ad..190a796 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -245,6 +245,18 @@ Note: You can use a different configuration directory per 
job by setting the env
 
 Note: It is possible to combine `-m yarn-cluster` with a detached YARN 
submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In 
this case, your application will not get any accumulator results or exceptions 
from the ExecutionEnvironment.execute() call!
 
+### User jars & Classpath
+
+By default Flink will include the user jars into the system classpath when 
running a single job. This behavior can be controlled with the 
`yarn.per-job-cluster.include-user-jar` parameter.
+
+When setting this to `DISABLED` Flink will include the jar in the user 
classpath instead.
+
+The user-jars position in the class path can be controlled by setting the 
parameter to one of the following:
+
+- `ORDER`: (default) Adds the jar to the system class path based on the 
lexicographic order.
+- `FIRST`: Adds the jar to the beginning of the system class path.
+- `LAST`: Adds the jar to the end of the system class path.
+
 ## Recovery behavior of Flink on YARN
 
 Flink's YARN client has the following configuration parameters to control how 
to behave in case of container failures. These parameters can be set either 
from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` 
parameters.

http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 65525f2..3110a5b 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -77,6 +78,7 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -144,7 +146,9 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        /** Optional Jar file to include in the system class loader of all 
application nodes
         * (for per-job submission) */
-       private Set<File> userJarFiles;
+       private final Set<File> userJarFiles = new HashSet<>();
+
+       private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
        public AbstractYarnClusterDescriptor() {
                // for unit tests only
@@ -172,6 +176,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        jobManagerMemoryMb = 
flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
                        taskManagerMemoryMb = 
flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
 
+                       userJarInclusion = 
getUserJarInclusionMode(flinkConfiguration);
                } catch (Exception e) {
                        LOG.debug("Config couldn't be loaded from environment 
variable.", e);
                }
@@ -200,6 +205,8 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        public void 
setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
                this.flinkConfiguration = conf;
+
+               userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
        }
 
        public org.apache.flink.configuration.Configuration 
getFlinkConfiguration() {
@@ -265,7 +272,10 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
         * Returns true if the descriptor has the job jars to include in the 
classpath.
         */
        public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
-               if (userJarFiles == null || userJarFiles.size() != 
requiredJarFiles.size()) {
+               if (userJarInclusion == 
YarnConfigOptions.UserJarInclusion.DISABLED) {
+                       return false;
+               }
+               if (userJarFiles.size() != requiredJarFiles.size()) {
                        return false;
                }
                try {
@@ -284,16 +294,14 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
         * Sets the user jar which is included in the system classloader of all 
nodes.
         */
        public void setProvidedUserJarFiles(List<URL> userJarFiles) {
-               Set<File> localUserJarFiles = new 
HashSet<>(userJarFiles.size());
                for (URL jarFile : userJarFiles) {
                        try {
-                               localUserJarFiles.add(new 
File(jarFile.toURI()));
+                               this.userJarFiles.add(new 
File(jarFile.toURI()));
                        } catch (URISyntaxException e) {
                                throw new IllegalArgumentException("Couldn't 
add local user jar: " + jarFile
                                        + " Currently only file:/// URLs are 
supported.");
                        }
                }
-               this.userJarFiles = localUserJarFiles;
        }
 
        public String getDynamicPropertiesEncoded() {
@@ -603,22 +611,22 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
 
                ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
-               Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
+               Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
                for (File file : shipFiles) {
-                       effectiveShipFiles.add(file.getAbsoluteFile());
+                       systemShipFiles.add(file.getAbsoluteFile());
                }
 
                //check if there is a logback or log4j file
                File logbackFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOGBACK_NAME);
                final boolean hasLogback = logbackFile.exists();
                if (hasLogback) {
-                       effectiveShipFiles.add(logbackFile);
+                       systemShipFiles.add(logbackFile);
                }
 
                File log4jFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOG4J_NAME);
                final boolean hasLog4j = log4jFile.exists();
                if (hasLog4j) {
-                       effectiveShipFiles.add(log4jFile);
+                       systemShipFiles.add(log4jFile);
                        if (hasLogback) {
                                // this means there is already a logback 
configuration file --> fail
                                LOG.warn("The configuration directory ('" + 
configurationDirectory + "') contains both LOG4J and " +
@@ -626,13 +634,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
-               addLibFolderToShipFiles(effectiveShipFiles);
-
-               // add the user jar to the classpath of the to-be-created 
cluster
-               if (userJarFiles != null) {
-                       effectiveShipFiles.addAll(userJarFiles);
-               }
-
+               addLibFolderToShipFiles(systemShipFiles);
 
                // Set-up ApplicationSubmissionContext for the application
 
@@ -666,57 +668,39 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
 
                // local resource map for Yarn
-               final Map<String, LocalResource> localResources = new 
HashMap<>(2 + effectiveShipFiles.size());
+               final Map<String, LocalResource> localResources = new 
HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
                // list of remote paths (after upload)
-               final List<Path> paths = new ArrayList<>(2 + 
effectiveShipFiles.size());
+               final List<Path> paths = new ArrayList<>(2 + 
systemShipFiles.size() + userJarFiles.size());
                // ship list that enables reuse of resources for task manager 
containers
                StringBuilder envShipFileList = new StringBuilder();
 
-               // upload and register ship files
-               final List<String> classPaths = new ArrayList<>();
-               for (File shipFile : effectiveShipFiles) {
-                       LocalResource shipResources = 
Records.newRecord(LocalResource.class);
-
-                       Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
-                       Path remotePath =
-                               Utils.setupLocalResource(fs, appId.toString(), 
shipLocalPath, shipResources, fs.getHomeDirectory());
-
-                       paths.add(remotePath);
-
-                       localResources.put(shipFile.getName(), shipResources);
-
-                       if (shipFile.isDirectory()) {
-                               // add directories to the classpath
-                               java.nio.file.Path shipPath = shipFile.toPath();
-                               final java.nio.file.Path parentPath = 
shipPath.getParent();
+               // upload and register ship files       
+               List<String> systemClassPaths = 
uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, 
localResources, envShipFileList);
+               List<String> userClassPaths = 
uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, 
localResources, envShipFileList);
 
-                               Files.walkFileTree(shipPath, new 
SimpleFileVisitor<java.nio.file.Path>() {
-                                       @Override
-                                       public FileVisitResult 
visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
-                                                       throws IOException {
-                                               java.nio.file.Path relativePath 
= parentPath.relativize(file);
-
-                                               
classPaths.add(relativePath.toString());
-
-                                               return FileVisitResult.CONTINUE;
-                                       }
-                               });
-                       } else {
-                               // add files to the classpath
-                               classPaths.add(shipFile.getName());
-                       }
-
-                       envShipFileList.append(remotePath).append(",");
+               if (userJarInclusion == 
YarnConfigOptions.UserJarInclusion.ORDER) {
+                       systemClassPaths.addAll(userClassPaths);
                }
 
                // normalize classpath by sorting
-               Collections.sort(classPaths);
+               Collections.sort(systemClassPaths);
+               Collections.sort(userClassPaths);
 
                // classpath assembler
                StringBuilder classPathBuilder = new StringBuilder();
-               for (String classPath : classPaths) {
+               if (userJarInclusion == 
YarnConfigOptions.UserJarInclusion.FIRST) {
+                       for (String userClassPath : userClassPaths) {
+                               
classPathBuilder.append(userClassPath).append(File.pathSeparator);
+                       }
+               }
+               for (String classPath : systemClassPaths) {
                        
classPathBuilder.append(classPath).append(File.pathSeparator);
                }
+               if (userJarInclusion == 
YarnConfigOptions.UserJarInclusion.LAST) {
+                       for (String userClassPath : userClassPaths) {
+                               
classPathBuilder.append(userClassPath).append(File.pathSeparator);
+                       }
+               }
 
                // Setup jar for ApplicationMaster
                LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
@@ -936,6 +920,51 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
                return report;
        }
+       
+       private static List<String> uploadAndRegisterFiles(
+                       Collection<File> shipFiles,
+                       FileSystem fs,
+                       String appId,
+                       List<Path> remotePaths,
+                       Map<String, LocalResource> localResources,
+                       StringBuilder envShipFileList) throws IOException {
+               final List<String> classPaths = new ArrayList<>(2 + 
shipFiles.size());
+               for (File shipFile : shipFiles) {
+                       LocalResource shipResources = 
Records.newRecord(LocalResource.class);
+
+                       Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
+                       Path remotePath =
+                               Utils.setupLocalResource(fs, appId, 
shipLocalPath, shipResources, fs.getHomeDirectory());
+
+                       remotePaths.add(remotePath);
+
+                       localResources.put(shipFile.getName(), shipResources);
+
+                       if (shipFile.isDirectory()) {
+                               // add directories to the classpath
+                               java.nio.file.Path shipPath = shipFile.toPath();
+                               final java.nio.file.Path parentPath = 
shipPath.getParent();
+
+                               Files.walkFileTree(shipPath, new 
SimpleFileVisitor<java.nio.file.Path>() {
+                                       @Override
+                                       public FileVisitResult 
visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
+                                               throws IOException {
+                                               java.nio.file.Path relativePath 
= parentPath.relativize(file);
+
+                                               
classPaths.add(relativePath.toString());
+
+                                               return FileVisitResult.CONTINUE;
+                                       }
+                               });
+                       } else {
+                               // add files to the classpath
+                               classPaths.add(shipFile.getName());
+                       }
+
+                       envShipFileList.append(remotePath).append(",");
+               }
+               return classPaths;
+       }
 
        /**
         * Kills YARN application and stops YARN client.
@@ -1220,7 +1249,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
        }
 
-       protected void addLibFolderToShipFiles(Set<File> effectiveShipFiles) {
+       protected void addLibFolderToShipFiles(Collection<File> 
effectiveShipFiles) {
                // Add lib folder to the ship files if the environment variable 
is set.
                // This is for convenience when running from the command-line.
                // (for other files users explicitly set the ship files)
@@ -1298,6 +1327,19 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                return amContainer;
        }
 
+       private static YarnConfigOptions.UserJarInclusion 
getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) {
+               String configuredUserJarInclusion = 
config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
+               try {
+                       return 
YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase());
+               } catch (IllegalArgumentException e) {
+                       LOG.warn("Configuration parameter {} was configured 
with an invalid value {}. Falling back to default ({}).",
+                               
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
+                               configuredUserJarInclusion,
+                               
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
+                       return 
YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
+               }
+       }
+
        /**
         * Creates a YarnClusterClient; may be overriden in tests
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 071bb7d..8839c1e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -42,8 +42,26 @@ public class YarnConfigOptions {
                        key("yarn.appmaster.rpc.port")
                        .defaultValue(-1);
 
+       /**
+        * Defines whether user-jars are included in the system class path for 
per-job-clusters as well as their positioning
+        * in the path. They can be positioned at the beginning ("FIRST"), at 
the end ("LAST"), or be positioned based on
+        * their name ("ORDER").
+        */
+       public static final ConfigOption<String> CLASSPATH_INCLUDE_USER_JAR =
+               key("yarn.per-job-cluster.include-user-jar")
+                       .defaultValue("ORDER");
+       
+
        // 
------------------------------------------------------------------------
 
        /** This class is not meant to be instantiated */
        private YarnConfigOptions() {}
+
+       /** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */
+       public enum UserJarInclusion {
+               DISABLED,
+               FIRST,
+               LAST,
+               ORDER
+       }
 }

Reply via email to