Repository: oozie Updated Branches: refs/heads/oya 4a9542caf -> 1773eab5b
OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0 with Oozie (satishsaley via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a4dbeda9 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a4dbeda9 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a4dbeda9 Branch: refs/heads/oya Commit: a4dbeda9550e4bec5ef5adf0f86f4ea7be6cd155 Parents: 3eca3c2 Author: Rohini Palaniswamy <[email protected]> Authored: Fri Sep 30 10:37:35 2016 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Fri Sep 30 10:37:35 2016 -0700 ---------------------------------------------------------------------- pom.xml | 24 +++++ release-log.txt | 1 + sharelib/spark/pom.xml | 4 +- .../apache/oozie/action/hadoop/SparkMain.java | 105 ++++++++++++++++--- .../oozie/action/hadoop/TestSparkMain.java | 42 ++++++-- 5 files changed, 155 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/a4dbeda9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 704a2ee..29184b2 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,8 @@ <pig.classifier></pig.classifier> <sqoop.version>1.4.3</sqoop.version> <spark.version>1.6.1</spark.version> + <spark.streaming.kafka.version>1.6.1</spark.streaming.kafka.version> + <spark.bagel.version>1.6.1</spark.bagel.version> <spark.guava.version>14.0.1</spark.guava.version> <spark.scala.binary.version>2.10</spark.scala.binary.version> <sqoop.classifier>hadoop100</sqoop.classifier> @@ -1895,5 +1897,27 @@ <module>login</module> </modules> </profile> + <profile> + <id>spark-1</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <spark.version>1.6.1</spark.version> + <spark.streaming.kafka.version>1.6.1</spark.streaming.kafka.version> + <spark.bagel.version>1.6.1</spark.bagel.version> + </properties> + </profile> + <profile> + <id>spark-2</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <spark.version>2.0.0</spark.version> + <spark.streaming.kafka.version>1.6.2</spark.streaming.kafka.version> + <spark.bagel.version>1.6.2</spark.bagel.version> + </properties> + </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/oozie/blob/a4dbeda9/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 13c11df..1579289 100644 --- a/release-log.txt +++ b/release-log.txt @@ -3,6 +3,7 @@ -- Oozie 4.3.0 release +OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0 with Oozie (satishsaley via rohini) OOZIE-2673 Include XSD for shell-action:0.3 in documentation (abhishekbafna via rkanter) OOZIE-2194 oozie job -kill doesn't work with spark action (abhishekbafna via rohini) OOZIE-2501 ZK reentrant lock doesn't work for few cases (puru) http://git-wip-us.apache.org/repos/asf/oozie/blob/a4dbeda9/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml index 905b9b7..fd783b3 100644 --- a/sharelib/spark/pom.xml +++ b/sharelib/spark/pom.xml @@ -202,13 +202,13 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${spark.scala.binary.version}</artifactId> - <version>${spark.version}</version> + <version>${spark.streaming.kafka.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-bagel_${spark.scala.binary.version}</artifactId> - <version>${spark.version}</version> + <version>${spark.bagel.version}</version> <scope>compile</scope> </dependency> http://git-wip-us.apache.org/repos/asf/oozie/blob/a4dbeda9/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index 407ba4b..539fb5c 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -18,14 +18,6 @@ package org.apache.oozie.action.hadoop; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.PropertyConfigurator; -import org.apache.spark.deploy.SparkSubmit; - import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -34,10 +26,23 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.jar.JarFile; +import java.util.jar.Manifest; import java.util.regex.Pattern; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.PropertyConfigurator; +import org.apache.spark.deploy.SparkSubmit; + public class SparkMain extends LauncherMain { private static final String MASTER_OPTION = "--master"; private static final String MODE_OPTION = "--deploy-mode"; @@ -56,6 +61,15 @@ public class SparkMain extends LauncherMain { private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties"; private static final Pattern[] SPARK_JOB_IDS_PATTERNS = { Pattern.compile("Submitted application (application[0-9_]*)") }; + public static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern + .compile("^spark-assembly((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$"); + public static final Pattern SPARK_YARN_JAR_PATTERN = Pattern + .compile("^spark-yarn((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$"); + private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*"); + private static final String SPARK_YARN_JAR = "spark.yarn.jar"; + private static final String SPARK_YARN_JARS = "spark.yarn.jars"; + private String sparkYarnJar = null; + private String sparkVersion = "1.X.X"; public static void main(String[] args) throws Exception { run(SparkMain.class, args); } @@ -179,16 +193,19 @@ public class SparkMain extends LauncherMain { } if ((yarnClusterMode || yarnClientMode)) { - String cachedFiles = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath); + LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath); + String cachedFiles = filterSparkYarnJar(fixedUris); if (cachedFiles != null && !cachedFiles.isEmpty()) { sparkArgs.add("--files"); sparkArgs.add(cachedFiles); } - String cachedArchives = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath); + fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath); + String cachedArchives = StringUtils.join(fixedUris, ","); if (cachedArchives != null && !cachedArchives.isEmpty()) { sparkArgs.add("--archives"); sparkArgs.add(cachedArchives); } + setSparkYarnJarsConf(sparkArgs); } if (!sparkArgs.contains(VERBOSE_OPTION)) { @@ -384,11 +401,11 @@ public class SparkMain extends LauncherMain { * @throws IOException * @throws URISyntaxException */ - private String fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException { + private LinkedList<URI> fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException { if (files == null) { return null; } - ArrayList<URI> listUris = new ArrayList<URI>(); + LinkedList<URI> listUris = new LinkedList<URI>(); FileSystem fs = FileSystem.get(new Configuration(true)); for (int i = 0; i < files.length; i++) { URI fileUri = files[i]; @@ -411,6 +428,68 @@ public class SparkMain extends LauncherMain { } } } + return listUris; + } + + /** + * Filters out the Spark yarn jar and records its version + * + * @param listUris string containing uris separated by comma + * @return + * @throws OozieActionConfiguratorException + */ + private String filterSparkYarnJar(LinkedList<URI> listUris) throws OozieActionConfiguratorException { + Iterator<URI> iterator = listUris.iterator(); + File matchedFile = null; + while (iterator.hasNext()) { + URI uri = iterator.next(); + Path p = new Path(uri); + if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN); + } + else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN); + } + if (matchedFile != null) { + sparkYarnJar = uri.toString(); + try { + sparkVersion = getJarVersion(matchedFile); + System.out.println("Spark Version " + sparkVersion); + } + catch (IOException io) { + System.out.println( + "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion); + } + iterator.remove(); + break; + } + } return StringUtils.join(listUris, ","); } -} + + /** + * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X. + * + * @param sparkArgs + */ + private void setSparkYarnJarsConf(List<String> sparkArgs) { + if (SPARK_VERSION_1.matcher(sparkVersion).find()) { + // In Spark 1.X.X, set spark.yarn.jar to avoid + // multiple distribution + sparkArgs.add("--conf"); + sparkArgs.add(SPARK_YARN_JAR + "=" + sparkYarnJar); + } + else { + // In Spark 2.X.X, set spark.yarn.jars + sparkArgs.add("--conf"); + sparkArgs.add(SPARK_YARN_JARS + "=" + sparkYarnJar); + } + } + + private String getJarVersion(File jarFile) throws IOException { + @SuppressWarnings("resource") + Manifest manifest = new JarFile(jarFile).getManifest(); + return manifest.getMainAttributes().getValue("Specification-Version"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/a4dbeda9/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java index 5ef4649..f044048 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java @@ -18,11 +18,6 @@ package org.apache.oozie.action.hadoop; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.XConfiguration; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -30,6 +25,13 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; +import java.util.ArrayList; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; public class TestSparkMain extends MainTestCase { @@ -90,4 +92,32 @@ public class TestSparkMain extends MainTestCase { assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT))); return null; } -} + + public void testPatterns() { + patternHelper("spark-yarn", SparkMain.SPARK_YARN_JAR_PATTERN); + patternHelper("spark-assembly", SparkMain.SPARK_ASSEMBLY_JAR_PATTERN); + } + + private void patternHelper(String jarName, Pattern pattern) { + ArrayList<String> jarList = new ArrayList<String>(); + jarList.add(jarName + "-1.2.jar"); + jarList.add(jarName + "-1.2.4.jar"); + jarList.add(jarName + "1.2.4.jar"); + jarList.add(jarName + "-1.2.4_1.2.3.4.jar"); + jarList.add(jarName + ".jar"); + + // all should pass + for (String s : jarList) { + assertTrue(pattern.matcher(s).find()); + } + + jarList.clear(); + jarList.add(jarName + "-1.2.3-sources.jar"); + jarList.add(jarName + "-sources-1.2.3.jar"); + jarList.add(jarName + "-sources.jar"); + // all should not pass + for (String s : jarList) { + assertFalse(pattern.matcher(s).find()); + } + } +} \ No newline at end of file
