amend OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0da31f47 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0da31f47 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0da31f47 Branch: refs/heads/oya Commit: 0da31f47ea975387c5d05763c632a5b8915bd9cc Parents: d2a98a1 Author: Satish Subhashrao Saley <[email protected]> Authored: Thu Feb 9 23:00:35 2017 -0800 Committer: Satish Subhashrao Saley <[email protected]> Committed: Thu Feb 9 23:09:56 2017 -0800 ---------------------------------------------------------------------- .../site/twiki/DG_SparkActionExtension.twiki | 36 ++++++ .../apache/oozie/action/hadoop/SparkMain.java | 24 +++- .../oozie/action/hadoop/TestJarFilter.java | 109 +++++++++++++++++++ 3 files changed, 165 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/0da31f47/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index 06b1d0c..863bd89 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -200,6 +200,42 @@ The =jar= element indicates python file. Refer to the file by it's localized nam in PySpark. The py file should be in the lib/ folder next to the workflow.xml or added using the =file= element so that it's localized to the working directory with just its name. +---+++ Using Symlink in <jar> + +A symlink must be specified using =[[WorkflowFunctionalSpec#a3.2.2. +1_Adding_Files_and_Archives_for_the_Job][file]]= element. Then, you can use +the symlink name in =jar= element. + +*Example:* + +Specifying relative path for symlink: + +Make sure that the file is within the application directory i.e. =oozie.wf.application.path= . +<verbatim> + <spark xmlns="uri:oozie:spark-action:0.2"> + ... + <jar>py-spark-example-symlink.py</jar> + ... + ... + <file>py-spark.py#py-spark-example-symlink.py</file> + ... + </spark> +</verbatim> + +Specifying full path for symlink: +<verbatim> + <spark xmlns="uri:oozie:spark-action:0.2"> + ... + <jar>spark-example-symlink.jar</jar> + ... + ... + <file>hdfs://localhost:8020/user/testjars/all-oozie-examples.jar#spark-example-symlink.jar</file> + ... + </spark> +</verbatim> + + + ---++ Appendix, Spark XML-Schema ---+++ AE.A Appendix A, Spark XML-Schema http://git-wip-us.apache.org/repos/asf/oozie/blob/0da31f47/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index d37053d..12eb61a 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -520,7 +520,7 @@ public class SparkMain extends LauncherMain { /** * This class is used for filtering out unwanted jars. */ - private static class JarFilter { + static class JarFilter { private String sparkVersion = "1.X.X"; private String sparkYarnJar; private String applicationJar; @@ -547,9 +547,10 @@ public class SparkMain extends LauncherMain { * * @throws OozieActionConfiguratorException */ - private void filter() throws OozieActionConfiguratorException { + public void filter() throws OozieActionConfiguratorException { Iterator<URI> iterator = listUris.iterator(); File matchedFile = null; + Path applJarPath = new Path(applicationJar); while (iterator.hasNext()) { URI uri = iterator.next(); Path p = new Path(uri); @@ -575,13 +576,28 @@ public class SparkMain extends LauncherMain { // Here we skip the application jar, because // (if uris are same,) it will get distributed multiple times // - one time with --files and another time as application jar. - if (p.getName().equals(applicationJar) || uri.toString().equals(applicationJar)) { - applicationJar = uri.toString(); + if (isApplicationJar(p.getName(), uri, applJarPath)) { + String fragment = uri.getFragment(); + applicationJar = fragment != null && fragment.length() > 0 ? fragment : uri.toString(); iterator.remove(); } } } + /** + * Checks if a file is application jar + * + * @param fileName fileName name of the file + * @param fileUri fileUri URI of the file + * @param applJarPath Path of application jar + * @return true if fileName or fileUri is the application jar + */ + private boolean isApplicationJar(String fileName, URI fileUri, Path applJarPath) { + return (fileName.equals(applicationJar) || fileUri.toString().equals(applicationJar) + || applJarPath.getName().equals(fileName) + || applicationJar.equals(fileUri.getFragment())); + } + public String getApplicationJar() { return applicationJar; } http://git-wip-us.apache.org/repos/asf/oozie/blob/0da31f47/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java new file mode 100644 index 0000000..2d4c83c --- /dev/null +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedList; +import java.util.jar.Attributes; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +import org.apache.oozie.action.hadoop.SparkMain.JarFilter; +import org.junit.Test; + +public class TestJarFilter { + + @Test + public void testJarFilter() throws URISyntaxException, IOException, OozieActionConfiguratorException { + LinkedList<URI> listUris = new LinkedList<URI>(); + String sparkVersion = "2.1.0"; + String sparkYarnJar = "spark-yarn-" + sparkVersion + ".jar"; + String applicationJarName = "oozie-examples.jar"; + String renamedApplicationJar = "renamed-oozie-examples.jar"; + URI sparkYarnJarUri = new URI("hdfs://localhost:8020/user/sparkjars/" + sparkYarnJar); + URI applicationJarUri = new URI("hdfs://localhost:8020/user/sparkdata/" + applicationJarName); + createSparkYarnJar(sparkYarnJar, sparkVersion); + populateUris(listUris, sparkYarnJarUri, applicationJarUri); + + // check application jar, spark yarn jar and spark version + JarFilter jarFilter = new JarFilter(listUris, applicationJarUri.getPath()); + jarFilter.filter(); + assertEquals(applicationJarUri.toString(), jarFilter.getApplicationJar()); + assertEquals(sparkYarnJarUri.toString(), jarFilter.getSparkYarnJar()); + assertEquals(sparkVersion, jarFilter.getSparkVersion()); + checkFilteredUris(listUris, sparkYarnJarUri.toString(), applicationJarUri.toString()); + listUris.clear(); + + // check application jar with fragmented URI + applicationJarUri = new URI("hdfs", "localhost", "/user/sparkdata/" + applicationJarName, + renamedApplicationJar); + populateUris(listUris, sparkYarnJarUri, applicationJarUri); + jarFilter = new JarFilter(listUris, applicationJarUri.getPath()); + jarFilter.filter(); + assertEquals(renamedApplicationJar, jarFilter.getApplicationJar()); + assertEquals(sparkYarnJarUri.toString(), jarFilter.getSparkYarnJar()); + assertEquals(sparkVersion, jarFilter.getSparkVersion()); + checkFilteredUris(listUris, sparkYarnJarUri.toString(), renamedApplicationJar); + listUris.clear(); + + // application jar is present in <file> with symlink + // and user mentioned the symlink name in <jar> + populateUris(listUris, sparkYarnJarUri, applicationJarUri); + jarFilter = new JarFilter(listUris, renamedApplicationJar); + jarFilter.filter(); + assertEquals(renamedApplicationJar, jarFilter.getApplicationJar()); + assertEquals(sparkYarnJarUri.toString(), jarFilter.getSparkYarnJar()); + assertEquals(sparkVersion, jarFilter.getSparkVersion()); + checkFilteredUris(listUris, sparkYarnJarUri.toString(), renamedApplicationJar); + } + + private void checkFilteredUris(LinkedList<URI> listUris, String sparkYarnJar, String applicationJar) { + for(URI uri : listUris) { + assertFalse(uri.toString().contains(sparkYarnJar)); + assertFalse(uri.toString().contains(applicationJar)); + } + } + + private void createSparkYarnJar(String sparkYarnJar, String sparkVersion) + throws FileNotFoundException, IOException { + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + manifest.getMainAttributes().put(Attributes.Name.SPECIFICATION_VERSION, sparkVersion); + JarOutputStream target = new JarOutputStream(new FileOutputStream(sparkYarnJar), manifest); + target.flush(); + target.close(); + } + + private void populateUris(LinkedList<URI> listUris, URI sparkYarnJarUri, URI applicationJarUri) + throws URISyntaxException { + String fileNamePattern = "hdfs://localhost:8020/user/sparkdata/%s%s%s"; + listUris.add(new URI(String.format(fileNamePattern, "_SUCCESS", "#", "ouputflag.txt"))); + listUris.add(new URI(String.format(fileNamePattern, "dependency.jar", "", ""))); + listUris.add(new URI(String.format(fileNamePattern, "helper.jar", "", ""))); + listUris.add(sparkYarnJarUri); + listUris.add(applicationJarUri); + } +} \ No newline at end of file
