Repository: flink Updated Branches: refs/heads/master e28380152 -> 4a314a80e
[FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath This closes #3931 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a314a80 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a314a80 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a314a80 Branch: refs/heads/master Commit: 4a314a80e64ea2af624131e5c02e1f167a22a354 Parents: e283801 Author: zentol <ches...@apache.org> Authored: Wed May 17 18:15:27 2017 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Thu May 18 14:01:53 2017 +0200 ---------------------------------------------------------------------- docs/setup/config.md | 2 + docs/setup/yarn_setup.md | 12 ++ .../yarn/AbstractYarnClusterDescriptor.java | 152 ++++++++++++------- .../yarn/configuration/YarnConfigOptions.java | 18 +++ 4 files changed, 129 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index c4a7354..8a6f67d 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -457,6 +457,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String - `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application. +- `yarn.per-job-cluster.include-user-jar` (Default: ORDER) Control whether and how the user-jar is included in the system class path for per-job clusters. Setting this parameter to `DISABLED` causes the jar to be included in the user class path instead. Setting this parameter to one of `FIRST`, `LAST` or `ORDER` causes the jar to be included in the system class path, with the jar either being placed at the beginning of the class path (`FIRST`), at the end (`LAST`), or based on the lexicographic order (`ORDER`). + ### Mesos http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/yarn_setup.md ---------------------------------------------------------------------- diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index 1ce45ad..190a796 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call! +### User jars & Classpath + +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-user-jar` parameter. + +When setting this to `DISABLED` Flink will include the jar in the user classpath instead. + +The user-jars position in the class path can be controlled by setting the parameter to one of the following: + +- `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order. +- `FIRST`: Adds the jar to the beginning of the system class path. +- `LAST`: Adds the jar to the end of the system class path. + ## Recovery behavior of Flink on YARN Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters. http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 65525f2..3110a5b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -77,6 +78,7 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -144,7 +146,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor /** Optional Jar file to include in the system class loader of all application nodes * (for per-job submission) */ - private Set<File> userJarFiles; + private final Set<File> userJarFiles = new HashSet<>(); + + private YarnConfigOptions.UserJarInclusion userJarInclusion; public AbstractYarnClusterDescriptor() { // for unit tests only @@ -172,6 +176,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); + userJarInclusion = getUserJarInclusionMode(flinkConfiguration); } catch (Exception e) { LOG.debug("Config couldn't be loaded from environment variable.", e); } @@ -200,6 +205,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { this.flinkConfiguration = conf; + + userJarInclusion = getUserJarInclusionMode(flinkConfiguration); } public org.apache.flink.configuration.Configuration getFlinkConfiguration() { @@ -265,7 +272,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor * Returns true if the descriptor has the job jars to include in the classpath. */ public boolean hasUserJarFiles(List<URL> requiredJarFiles) { - if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) { + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED) { + return false; + } + if (userJarFiles.size() != requiredJarFiles.size()) { return false; } try { @@ -284,16 +294,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor * Sets the user jar which is included in the system classloader of all nodes. */ public void setProvidedUserJarFiles(List<URL> userJarFiles) { - Set<File> localUserJarFiles = new HashSet<>(userJarFiles.size()); for (URL jarFile : userJarFiles) { try { - localUserJarFiles.add(new File(jarFile.toURI())); + this.userJarFiles.add(new File(jarFile.toURI())); } catch (URISyntaxException e) { throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile + " Currently only file:/// URLs are supported."); } } - this.userJarFiles = localUserJarFiles; } public String getDynamicPropertiesEncoded() { @@ -603,22 +611,22 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); - Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size()); + Set<File> systemShipFiles = new HashSet<>(shipFiles.size()); for (File file : shipFiles) { - effectiveShipFiles.add(file.getAbsoluteFile()); + systemShipFiles.add(file.getAbsoluteFile()); } //check if there is a logback or log4j file File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME); final boolean hasLogback = logbackFile.exists(); if (hasLogback) { - effectiveShipFiles.add(logbackFile); + systemShipFiles.add(logbackFile); } File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME); final boolean hasLog4j = log4jFile.exists(); if (hasLog4j) { - effectiveShipFiles.add(log4jFile); + systemShipFiles.add(log4jFile); if (hasLogback) { // this means there is already a logback configuration file --> fail LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " + @@ -626,13 +634,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - addLibFolderToShipFiles(effectiveShipFiles); - - // add the user jar to the classpath of the to-be-created cluster - if (userJarFiles != null) { - effectiveShipFiles.addAll(userJarFiles); - } - + addLibFolderToShipFiles(systemShipFiles); // Set-up ApplicationSubmissionContext for the application @@ -666,57 +668,39 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } // local resource map for Yarn - final Map<String, LocalResource> localResources = new HashMap<>(2 + effectiveShipFiles.size()); + final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size()); // list of remote paths (after upload) - final List<Path> paths = new ArrayList<>(2 + effectiveShipFiles.size()); + final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size()); // ship list that enables reuse of resources for task manager containers StringBuilder envShipFileList = new StringBuilder(); - // upload and register ship files - final List<String> classPaths = new ArrayList<>(); - for (File shipFile : effectiveShipFiles) { - LocalResource shipResources = Records.newRecord(LocalResource.class); - - Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); - Path remotePath = - Utils.setupLocalResource(fs, appId.toString(), shipLocalPath, shipResources, fs.getHomeDirectory()); - - paths.add(remotePath); - - localResources.put(shipFile.getName(), shipResources); - - if (shipFile.isDirectory()) { - // add directories to the classpath - java.nio.file.Path shipPath = shipFile.toPath(); - final java.nio.file.Path parentPath = shipPath.getParent(); + // upload and register ship files + List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList); + List<String> userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList); - Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() { - @Override - public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) - throws IOException { - java.nio.file.Path relativePath = parentPath.relativize(file); - - classPaths.add(relativePath.toString()); - - return FileVisitResult.CONTINUE; - } - }); - } else { - // add files to the classpath - classPaths.add(shipFile.getName()); - } - - envShipFileList.append(remotePath).append(","); + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { + systemClassPaths.addAll(userClassPaths); } // normalize classpath by sorting - Collections.sort(classPaths); + Collections.sort(systemClassPaths); + Collections.sort(userClassPaths); // classpath assembler StringBuilder classPathBuilder = new StringBuilder(); - for (String classPath : classPaths) { + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) { + for (String userClassPath : userClassPaths) { + classPathBuilder.append(userClassPath).append(File.pathSeparator); + } + } + for (String classPath : systemClassPaths) { classPathBuilder.append(classPath).append(File.pathSeparator); } + if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) { + for (String userClassPath : userClassPaths) { + classPathBuilder.append(userClassPath).append(File.pathSeparator); + } + } // Setup jar for ApplicationMaster LocalResource appMasterJar = Records.newRecord(LocalResource.class); @@ -936,6 +920,51 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } return report; } + + private static List<String> uploadAndRegisterFiles( + Collection<File> shipFiles, + FileSystem fs, + String appId, + List<Path> remotePaths, + Map<String, LocalResource> localResources, + StringBuilder envShipFileList) throws IOException { + final List<String> classPaths = new ArrayList<>(2 + shipFiles.size()); + for (File shipFile : shipFiles) { + LocalResource shipResources = Records.newRecord(LocalResource.class); + + Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); + Path remotePath = + Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory()); + + remotePaths.add(remotePath); + + localResources.put(shipFile.getName(), shipResources); + + if (shipFile.isDirectory()) { + // add directories to the classpath + java.nio.file.Path shipPath = shipFile.toPath(); + final java.nio.file.Path parentPath = shipPath.getParent(); + + Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() { + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) + throws IOException { + java.nio.file.Path relativePath = parentPath.relativize(file); + + classPaths.add(relativePath.toString()); + + return FileVisitResult.CONTINUE; + } + }); + } else { + // add files to the classpath + classPaths.add(shipFile.getName()); + } + + envShipFileList.append(remotePath).append(","); + } + return classPaths; + } /** * Kills YARN application and stops YARN client. @@ -1220,7 +1249,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - protected void addLibFolderToShipFiles(Set<File> effectiveShipFiles) { + protected void addLibFolderToShipFiles(Collection<File> effectiveShipFiles) { // Add lib folder to the ship files if the environment variable is set. // This is for convenience when running from the command-line. // (for other files users explicitly set the ship files) @@ -1298,6 +1327,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor return amContainer; } + private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) { + String configuredUserJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + try { + return YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + return YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + } + } + /** * Creates a YarnClusterClient; may be overriden in tests */ http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index 071bb7d..8839c1e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -42,8 +42,26 @@ public class YarnConfigOptions { key("yarn.appmaster.rpc.port") .defaultValue(-1); + /** + * Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning + * in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on + * their name ("ORDER"). + */ + public static final ConfigOption<String> CLASSPATH_INCLUDE_USER_JAR = + key("yarn.per-job-cluster.include-user-jar") + .defaultValue("ORDER"); + + // ------------------------------------------------------------------------ /** This class is not meant to be instantiated */ private YarnConfigOptions() {} + + /** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */ + public enum UserJarInclusion { + DISABLED, + FIRST, + LAST, + ORDER + } }