Repository: oozie Updated Branches: refs/heads/master eee0a4ee4 -> c22364554
OOZIE-2547 Add mapreduce.job.cache.files to spark action (satishsaley via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c2236455 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c2236455 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c2236455 Branch: refs/heads/master Commit: c22364554ca51a422319776b89a4c9b727714499 Parents: eee0a4e Author: Rohini Palaniswamy <[email protected]> Authored: Fri Jun 10 10:13:25 2016 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Fri Jun 10 10:13:25 2016 -0700 ---------------------------------------------------------------------- .../site/twiki/DG_SparkActionExtension.twiki | 60 +++++- release-log.txt | 1 + .../apache/oozie/action/hadoop/SparkMain.java | 189 ++++++++----------- 3 files changed, 142 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index d7d75a1..74875bb 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -153,7 +153,7 @@ to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tr To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml. -1. spark.yarn.historyServer.address=http://SPH-HOST:18088 +1. spark.yarn.historyServer.address=SPH-HOST:18088 2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory @@ -261,6 +261,64 @@ it's localized to the working directory with just its name. </xs:schema> </verbatim> +---++++ Spark Action Schema Version 0.2 +<verbatim> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:spark="uri:oozie:spark-action:0.2" elementFormDefault="qualified" + targetNamespace="uri:oozie:spark-action:0.2"> + + <xs:element name="spark" type="spark:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="CONFIGURATION"> + <xs:sequence> + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="PREPARE"> + <xs:sequence> + <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="DELETE"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="MKDIR"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + +</xs:schema> +</verbatim> [[index][::Go back to Oozie Documentation Index::]] </noautolink> http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4a556e6..88f66e9 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2547 Add mapreduce.job.cache.files to spark action (satishsaley via rohini) OOZIE-2550 Flaky tests in TestZKUUIDService.java (pbacsko via rkanter) OOZIE-2445 Doc for - Specifying coordinator input datasets in more logical ways (puru) OOZIE-2541 Possible resource leak in Hive2Credentials (pbacsko via rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index 13c1075..3acaef9 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -21,6 +21,8 @@ package org.apache.oozie.action.hadoop; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.PropertyConfigurator; import org.apache.spark.deploy.SparkSubmit; @@ -28,9 +30,10 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.regex.Pattern; @@ -43,18 +46,12 @@ public class SparkMain extends LauncherMain { private static final String VERBOSE_OPTION = "--verbose"; private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath="; private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; - private static final String DIST_FILES = "spark.yarn.dist.files="; - private static final String JARS_OPTION = "--jars"; private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled"; private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled"; - + private static final String PWD = "$PWD" + File.separator + "*"; private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"), Pattern.compile("pyspark.zip") }; private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf"); - - private String sparkJars = null; - private String sparkClasspath = null; - private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties"; private static final Pattern[] SPARK_JOB_IDS_PATTERNS = { Pattern.compile("Submitted application (application[0-9_]*)") }; @@ -66,6 +63,7 @@ public class SparkMain extends LauncherMain { protected void run(String[] args) throws Exception { boolean isPyspark = false; Configuration actionConf = loadActionConf(); + setYarnTag(actionConf); LauncherMainHadoopUtils.killChildYarnJobs(actionConf); String logFile = setUpSparkLog4J(actionConf); @@ -75,6 +73,10 @@ public class SparkMain extends LauncherMain { String master = actionConf.get(SparkActionExecutor.SPARK_MASTER); sparkArgs.add(master); + // In local mode, everything runs here in the Launcher Job. + // In yarn-client mode, the driver runs here in the Launcher Job and the + // executor in Yarn. + // In yarn-cluster mode, the driver and executor run in Yarn. String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); if (sparkDeployMode != null) { sparkArgs.add(MODE_OPTION); @@ -98,33 +100,8 @@ public class SparkMain extends LauncherMain { if(jarPath!=null && jarPath.endsWith(".py")){ isPyspark = true; } - - // In local mode, everything runs here in the Launcher Job. - // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn. - // In yarn-cluster mode, the driver and executor run in Yarn. - // Due to this, configuring Spark's classpath is not straightforward (see below) - - // Parse the spark opts. We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark. The way we do - // this depends on what mode is being used: - // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument - // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath - // yarn-client: Passed as comma-separted list via spark.yarn.dist.files - // - // --jars will cause the jars to be uploaded to HDFS and localized. To prevent the Sharelib and user jars from being - // unnecessarily reuploaded to HDFS, we use the HDFS paths for these. The hadoop jars are needed as well, but we'll have - // to use local paths for these because they're not in the Sharelib. - // - // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only - // localized jars. --jars will cause the jars to be localized to the working directory, so we can simply specify the jar - // names for the classpaths, as they'll be found in the working directory. - // - // This part looks more complicated than it is because we need to append the jars if the user already set things for - // these options - determineSparkJarsAndClasspath(actionConf, jarPath); boolean addedExecutorClasspath = false; boolean addedDriverClasspath = false; - boolean addedDistFiles = false; - boolean addedJars = false; boolean addedHiveSecurityToken = false; boolean addedHBaseSecurityToken = false; String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); @@ -132,29 +109,19 @@ public class SparkMain extends LauncherMain { List<String> sparkOptions = splitSparkOpts(sparkOpts); for (int i = 0; i < sparkOptions.size(); i++) { String opt = sparkOptions.get(i); - if (sparkJars != null) { - if (opt.equals(JARS_OPTION)) { - sparkArgs.add(opt); - i++; - if(i < sparkOptions.size()) { - opt = sparkOptions.get(i); - opt = opt + "," + sparkJars; - addedJars = true; - } else { - throw new OozieActionConfiguratorException(JARS_OPTION + " missing a parameter."); - } - } else if (yarnClientMode && opt.startsWith(DIST_FILES)) { - opt = opt + "," + sparkJars; - addedDistFiles = true; - } - } - if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { + if (yarnClusterMode || yarnClientMode) { if (opt.startsWith(EXECUTOR_CLASSPATH)) { - opt = opt + File.pathSeparator + sparkClasspath; + // Include the current working directory (of executor + // container) in executor classpath, because it will contain + // localized files + opt = opt + File.pathSeparator + PWD; addedExecutorClasspath = true; } if (opt.startsWith(DRIVER_CLASSPATH)) { - opt = opt + File.pathSeparator + sparkClasspath; + // Include the current working directory (of driver + // container) in executor classpath, because it will contain + // localized files + opt = opt + File.pathSeparator + PWD; addedDriverClasspath = true; } } @@ -167,25 +134,23 @@ public class SparkMain extends LauncherMain { sparkArgs.add(opt); } } - if (!addedJars && sparkJars != null) { - sparkArgs.add("--jars"); - sparkArgs.add(sparkJars); - } - if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { + + if ((yarnClusterMode || yarnClientMode)) { if (!addedExecutorClasspath) { + // Include the current working directory (of executor container) + // in executor classpath, because it will contain localized + // files sparkArgs.add("--conf"); - sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath); + sparkArgs.add(EXECUTOR_CLASSPATH + PWD); } if (!addedDriverClasspath) { + // Include the current working directory (of driver container) + // in executor classpath, because it will contain localized + // files sparkArgs.add("--conf"); - sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath); + sparkArgs.add(DRIVER_CLASSPATH + PWD); } } - if (yarnClientMode && !addedDistFiles && sparkJars != null) { - sparkArgs.add("--conf"); - sparkArgs.add(DIST_FILES + sparkJars); - } - sparkArgs.add("--conf"); sparkArgs.add("spark.executor.extraJavaOptions=-Dlog4j.configuration=" + SPARK_LOG4J_PROPS); @@ -205,6 +170,20 @@ public class SparkMain extends LauncherMain { sparkArgs.add("--properties-file"); sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString()); } + + if ((yarnClusterMode || yarnClientMode)) { + String cachedFiles = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath); + if (cachedFiles != null && !cachedFiles.isEmpty()) { + sparkArgs.add("--files"); + sparkArgs.add(cachedFiles); + } + String cachedArchives = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath); + if (cachedArchives != null && !cachedArchives.isEmpty()) { + sparkArgs.add("--archives"); + sparkArgs.add(cachedArchives); + } + } + if (!sparkArgs.contains(VERBOSE_OPTION)) { sparkArgs.add(VERBOSE_OPTION); } @@ -217,6 +196,7 @@ public class SparkMain extends LauncherMain { createPySparkLibFolder(); } + System.out.println("Spark Action Main class : " + SparkSubmit.class.getName()); System.out.println(); System.out.println("Oozie Spark action configuration"); @@ -299,49 +279,6 @@ public class SparkMain extends LauncherMain { SparkSubmit.main(args); } - private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) { - // distCache gets all of the Sharelib and user jars (from the Distributed Cache) - // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache - // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths - // sparkClasspath becomes all of the jar names in sparkJars (without paths) - // We also remove the Spark job jar and job.jar - String[] distCache = new String[]{}; - String dCache = actionConf.get("mapreduce.job.classpath.files"); - if (dCache != null) { - distCache = dCache.split(","); - } - String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator); - StringBuilder cp = new StringBuilder(); - StringBuilder jars = new StringBuilder(); - HashSet<String> distCacheJars = new HashSet<String>(distCache.length); - for (String path : distCache) { - // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here - if (!path.equals(jarPath)) { - String name = path.substring(path.lastIndexOf("/") + 1); - distCacheJars.add(name); - cp.append(name).append(File.pathSeparator); - jars.append(path).append(","); - } - } - for (String path : classpath) { - if (!path.startsWith("job.jar") && path.endsWith(".jar")) { - String name = path.substring(path.lastIndexOf("/") + 1); - if (!distCacheJars.contains(name)) { - jars.append(path).append(","); - } - cp.append(name).append(File.pathSeparator); - } - } - if (cp.length() > 0) { - cp.setLength(cp.length() - 1); - sparkClasspath = cp.toString(); - } - if (jars.length() > 0) { - jars.setLength(jars.length() - 1); - sparkJars = jars.toString(); - } - } - /** * Converts the options to be Spark-compatible. * <ul> @@ -419,4 +356,42 @@ public class SparkMain extends LauncherMain { PropertyConfigurator.configure(SPARK_LOG4J_PROPS); return logFile; } + + /** + * Convert URIs into the default format which Spark expects + * + * @param files + * @return + * @throws IOException + * @throws URISyntaxException + */ + private String fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException { + if (files == null) { + return null; + } + ArrayList<URI> listUris = new ArrayList<URI>(); + FileSystem fs = FileSystem.get(new Configuration(true)); + for (int i = 0; i < files.length; i++) { + URI fileUri = files[i]; + // Spark compares URIs based on scheme, host and port. + // Here we convert URIs into the default format so that Spark + // won't think those belong to different file system. + // This will avoid an extra copy of files which already exists on + // same hdfs. + if (!fileUri.toString().equals(jarPath) && fs.getUri().getScheme().equals(fileUri.getScheme()) + && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null) + && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1 + || fs.getUri().getPort() == fileUri.getPort())) { + URI uri = new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), + fs.getUri().getPort(), fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment()); + // Here we skip the application jar, because + // (if uris are same,) it will get distributed multiple times + // - one time with --files and another time as application jar. + if (!uri.toString().equals(jarPath)) { + listUris.add(uri); + } + } + } + return StringUtils.join(listUris, ","); + } }
