Repository: oozie
Updated Branches:
  refs/heads/master c0b5497b2 -> b0ebf58e8


OOZIE-2482 Pyspark job fails with Oozie (satishsaley and gezapeti via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b0ebf58e
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b0ebf58e
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b0ebf58e

Branch: refs/heads/master
Commit: b0ebf58e86f26f01571f558bf3b04d1788b8380f
Parents: c0b5497
Author: Robert Kanter <[email protected]>
Authored: Thu May 26 16:49:33 2016 -0700
Committer: Robert Kanter <[email protected]>
Committed: Thu May 26 16:54:06 2016 -0700

----------------------------------------------------------------------
 .../action/hadoop/SparkActionExecutor.java      |  21 +++
 docs/src/site/twiki/AG_Install.twiki            |   5 +
 .../site/twiki/DG_SparkActionExtension.twiki    |  41 +++++
 pom.xml                                         |   2 +-
 release-log.txt                                 |   1 +
 sharelib/spark/pom.xml                          | 101 ++++++++++
 .../apache/oozie/action/hadoop/SparkMain.java   |  53 +++++-
 .../apache/oozie/action/hadoop/TestPyspark.java | 182 +++++++++++++++++++
 .../action/hadoop/TestSparkActionExecutor.java  |   3 +-
 sharelib/spark/src/test/resources/pi.py         |  41 +++++
 .../spark/src/test/resources/py4j-0.9-src.zip   | Bin 0 -> 44846 bytes
 sharelib/spark/src/test/resources/pyspark.zip   | Bin 0 -> 357051 bytes
 12 files changed, 447 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 219a116..6d37105 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -43,6 +43,7 @@ public class SparkActionExecutor extends JavaActionExecutor {
     public static final String SPARK_JOB_NAME = "oozie.spark.name";
     public static final String SPARK_CLASS = "oozie.spark.class";
     public static final String SPARK_JAR = "oozie.spark.jar";
+    public static final String MAPRED_CHILD_ENV = "mapred.child.env";
 
     public SparkActionExecutor() {
         super("spark");
@@ -107,6 +108,26 @@ public class SparkActionExecutor extends 
JavaActionExecutor {
     }
 
     @Override
+    Configuration setupLauncherConf(Configuration conf, Element actionXml, 
Path appPath, Context context)
+            throws ActionExecutorException {
+        super.setupLauncherConf(conf, actionXml, appPath, context);
+
+        // Set SPARK_HOME environment variable on launcher job
+        // It is needed since pyspark client checks for it.
+        String sparkHome = "SPARK_HOME=.";
+        String mapredChildEnv = conf.get("oozie.launcher." + MAPRED_CHILD_ENV);
+
+        if (mapredChildEnv == null) {
+            conf.set(MAPRED_CHILD_ENV, sparkHome);
+            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, sparkHome);
+        } else if (!mapredChildEnv.contains("SPARK_HOME")) {
+            conf.set(MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome);
+            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, mapredChildEnv + 
"," + sparkHome);
+        }
+        return conf;
+    }
+
+    @Override
     public List<Class> getLauncherClasses() {
         List<Class> classes = new ArrayList<Class>();
         try {

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/docs/src/site/twiki/AG_Install.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_Install.twiki 
b/docs/src/site/twiki/AG_Install.twiki
index 66c0019..2d36737 100644
--- a/docs/src/site/twiki/AG_Install.twiki
+++ b/docs/src/site/twiki/AG_Install.twiki
@@ -1058,10 +1058,15 @@ action and value is a comma separated list of DFS 
directories or jar files.
             oozie.pig_10=hdfs:///share/lib/pig/pig-0.10.1/lib/
             oozie.pig=hdfs:///share/lib/pig/pig-0.11.1/lib/
             
oozie.distcp=hdfs:///share/lib/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-distcp-2.2.0.jar
+            
oozie.spark=hdfs:///share/lib/spark/lib/,hdfs:///share/lib/spark/python/lib/pyspark.zip,hdfs:///share/lib/spark/python/lib/py4j-0-9-src.zip
         </description>
     </property>
  </verbatim>
 
+Oozie sharelib TAR.GZ file bundled with the distribution does not contain 
pyspark and py4j zip files since they vary
+with Apache Spark version. Therefore, to run pySpark using Spark Action, user 
need to specify pyspark and py4j zip
+files. These files can be added either to workflow's lib/ directory, to the 
sharelib or in sharelib mapping file.
+
 
 ---++ Oozie Coordinators/Bundles Processing Timezone
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki 
b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 18b84b7..d7d75a1 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -159,6 +159,47 @@ either in =spark-opts= with =--conf= or from 
=oozie.service.SparkConfigurationSe
 
 3. spark.eventLog.enabled=true
 
+---+++ PySpark with Spark Action
+
+To submit PySpark scripts with Spark Action, pyspark dependencies must be 
available in sharelib or in workflow's lib/ directory.
+For more information, please refer to 
[[AG_Install#Oozie_Share_Lib][installation document.]]
+
+*Example:*
+
+<verbatim>
+<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
+    ....
+    <action name="myfirstpysparkjob">
+        <spark xmlns="uri:oozie:spark-action:0.1">
+            <job-tracker>foo:8021</job-tracker>
+            <name-node>bar:8020</name-node>
+            <prepare>
+                <delete path="${jobOutput}"/>
+            </prepare>
+            <configuration>
+                <property>
+                    <name>mapred.compress.map.output</name>
+                    <value>true</value>
+                </property>
+            </configuration>
+            <master>yarn-cluster</master>
+            <name>Spark Example</name>
+            <jar>pi.py</jar>
+            <spark-opts>--executor-memory 20G --num-executors 50
+            --conf 
spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp"</spark-opts>
+            <arg>100</arg>
+        </spark>
+        <ok to="myotherjob"/>
+        <error to="errorcleanup"/>
+    </action>
+    ...
+</workflow-app>
+</verbatim>
+
+The =jar= element indicates python file. Refer to the file by it's localized 
name, because only local files are allowed
+in PySpark. The py file should be in the lib/ folder next to the workflow.xml 
or added using the =file= element so that
+it's localized to the working directory with just its name.
+
 ---++ Appendix, Spark XML-Schema
 
 ---+++ AE.A Appendix A, Spark XML-Schema

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 26f10a3..5b7a863 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,7 @@
          <pig.version>0.12.1</pig.version>
          <pig.classifier></pig.classifier>
          <sqoop.version>1.4.3</sqoop.version>
-         <spark.version>1.1.0</spark.version>
+         <spark.version>1.6.1</spark.version>
          <spark.guava.version>14.0.1</spark.guava.version>
          <sqoop.classifier>hadoop100</sqoop.classifier>
          <streaming.version>${hadoop.version}</streaming.version>

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7f40a99..fb2cbdb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2482 Pyspark job fails with Oozie (satishsaley and gezapeti via rkanter)
 OOZIE-2467 Oozie can shutdown itself on long GC pause (puru)
 OOZIE-2537 SqoopMain does not set up log4j properly (pbacsko via rkanter)
 OOZIE-2532 patch apply does not handle binary files (gezapeti via rkanter)

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index 72766dc..748ae06 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -50,6 +50,11 @@
             <scope>compile</scope>
         </dependency>
         <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.oozie</groupId>
             <artifactId>oozie-sharelib-oozie</artifactId>
             <scope>provided</scope>
@@ -64,6 +69,38 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-client</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-beeline</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-exec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-jdbc</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-metastore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-serde</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-service</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-shims</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -98,6 +135,38 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-annotations</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-beeline</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-exec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-jdbc</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-metastore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-serde</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-service</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-shims</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -318,6 +387,38 @@
                             <groupId>org.apache.hadoop</groupId>
                             <artifactId>hadoop-client</artifactId>
                         </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-beeline</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-common</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-exec</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-jdbc</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-metastore</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-serde</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-service</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.spark-project.hive</groupId>
+                            <artifactId>hive-shims</artifactId>
+                        </exclusion>
                     </exclusions>
                 </dependency>
             </dependencies>

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index a09ecfb..604f287 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -18,14 +18,17 @@
 
 package org.apache.oozie.action.hadoop;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.deploy.SparkSubmit;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.regex.Pattern;
 
 public class SparkMain extends LauncherMain {
     private static final String MASTER_OPTION = "--master";
@@ -40,6 +43,8 @@ public class SparkMain extends LauncherMain {
     private static final String HIVE_SECURITY_TOKEN = 
"spark.yarn.security.tokens.hive.enabled";
     private static final String HBASE_SECURITY_TOKEN = 
"spark.yarn.security.tokens.hbase.enabled";
 
+    private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { 
Pattern.compile("py4\\S*src.zip"),
+            Pattern.compile("pyspark.zip") };
     private String sparkJars = null;
     private String sparkClasspath = null;
 
@@ -49,6 +54,7 @@ public class SparkMain extends LauncherMain {
 
     @Override
     protected void run(String[] args) throws Exception {
+        boolean isPyspark = false;
         Configuration actionConf = loadActionConf();
         setYarnTag(actionConf);
         LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
@@ -79,6 +85,9 @@ public class SparkMain extends LauncherMain {
         }
 
         String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
+        if(jarPath!=null && jarPath.endsWith(".py")){
+            isPyspark = true;
+        }
 
         // In local mode, everything runs here in the Launcher Job.
         // In yarn-client mode, the driver runs here in the Launcher Job and 
the executor in Yarn.
@@ -179,10 +188,12 @@ public class SparkMain extends LauncherMain {
         }
 
         sparkArgs.add(jarPath);
-
         for (String arg : args) {
             sparkArgs.add(arg);
         }
+        if (isPyspark){
+            createPySparkLibFolder();
+        }
 
         System.out.println("Spark Action Main class        : " + 
SparkSubmit.class.getName());
         System.out.println();
@@ -196,6 +207,46 @@ public class SparkMain extends LauncherMain {
         runSpark(sparkArgs.toArray(new String[sparkArgs.size()]));
     }
 
+    /**
+     * SparkActionExecutor sets the SPARK_HOME environment variable to the 
local directory.
+     * Spark is looking for the pyspark.zip and py4j-VERSION-src.zip files in 
the python/lib folder under SPARK_HOME.
+     * This function creates the subfolders and copies the zips from the local 
folder.
+     * @throws OozieActionConfiguratorException  if the zip files are missing
+     * @throws IOException if there is an error during file copy
+     */
+    private void createPySparkLibFolder() throws 
OozieActionConfiguratorException, IOException {
+        File pythonLibDir = new File("python/lib");
+        if(!pythonLibDir.exists()){
+            pythonLibDir.mkdirs();
+            System.out.println("PySpark lib folder " + 
pythonLibDir.getAbsolutePath() + " folder created.");
+        }
+
+        for(Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) {
+            File file = getMatchingFile(fileNamePattern);
+            File destination = new File(pythonLibDir, file.getName());
+            FileUtils.copyFile(file, destination);
+            System.out.println("Copied " + file + " to " + 
destination.getAbsolutePath());
+        }
+    }
+
+    /**
+     * Searches for a file in the current directory that matches the given 
pattern.
+     * If there are multiple files matching the pattern returns one of them.
+     * @param fileNamePattern the pattern to look for
+     * @return the file if there is one
+     * @throws OozieActionConfiguratorException if there is are no files 
matching the pattern
+     */
+    private File getMatchingFile(Pattern fileNamePattern) throws 
OozieActionConfiguratorException {
+        File localDir = new File(".");
+        for(String fileName : localDir.list()){
+            if(fileNamePattern.matcher(fileName).find()){
+                return new File(fileName);
+            }
+        }
+        throw new OozieActionConfiguratorException("Missing py4j and/or 
pyspark zip files. Please add them to " +
+                "the lib folder or to the Spark sharelib.");
+    }
+
     private void runSpark(String[] args) throws Exception {
         
System.out.println("=================================================================");
         System.out.println();

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
new file mode 100644
index 0000000..458baaa
--- /dev/null
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.io.StringReader;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestPyspark extends ActionExecutorTestCase {
+
+
+    public static String PY4J_ZIP = "py4j-0.9-src.zip";
+    public static String PYSPARK_ZIP = "pyspark.zip";
+    public static String PI_EXAMPLE = "pi.py";
+
+
+    @Override
+    protected void setSystemProps() throws Exception {
+        super.setSystemProps();
+        setSystemProperty("oozie.service.ActionService.executor.classes", 
SparkActionExecutor.class.getName());
+    }
+
+    protected String getActionXml(String sparkOpts) {
+        String script = "<spark xmlns=''uri:oozie:spark-action:0.1''>" +
+                "<job-tracker>{0}</job-tracker>" +
+                "<name-node>{1}</name-node>" +
+                "<master>local[*]</master>" +
+                "<mode>client</mode>" +
+                "<name>PysparkExample</name>" +
+                "<jar>" + PI_EXAMPLE + "</jar>" +
+                "<spark-opts>" +sparkOpts +"</spark-opts>" +
+                "</spark>";
+        return MessageFormat.format(script, getJobTrackerUri(), 
getNameNodeUri());
+    }
+
+    public void testPyspark() throws Exception {
+        ArrayList<String> listLibFiles = new ArrayList<String>();
+
+        // <spark-opts> does not have any files
+        // pyspark and py4j are not present in current directory.
+        String sparkOpts = "--conf " + 
TestSparkActionExecutor.SPARK_TESTING_MEMORY;
+        WorkflowJobBean wf = getWorkflow(listLibFiles);
+        testPysparkHelper(sparkOpts, wf, "FAILED/KILLED", 
WorkflowAction.Status.ERROR);
+
+        // <spark-opts> has other files;
+        // pyspark and py4j are not present in current directory.
+        sparkOpts = "--py-files other.zip,myfunctions.py --conf " + 
TestSparkActionExecutor.SPARK_TESTING_MEMORY;
+        listLibFiles.add("other.zip");
+        listLibFiles.add("myfunctions.py");
+        wf = getWorkflow(listLibFiles);
+        testPysparkHelper(sparkOpts, wf, "FAILED/KILLED", 
WorkflowAction.Status.ERROR);
+
+        // <spark-opts> does not have any files
+        // pyspark and py4j are present in current directory.
+        sparkOpts = "--conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY;
+        listLibFiles.clear();
+        listLibFiles.add(PY4J_ZIP);
+        listLibFiles.add(PYSPARK_ZIP);
+        wf = getWorkflow(listLibFiles);
+        testPysparkHelper(sparkOpts, wf, "SUCCEEDED", 
WorkflowAction.Status.OK);
+
+        // <spark-opts> has some other files
+        // pyspark and py4j are present in current directory.
+        sparkOpts = "--py-files other.zip,myfunctions.py --conf " + 
TestSparkActionExecutor.SPARK_TESTING_MEMORY;
+        listLibFiles.clear();
+        listLibFiles.add("other.zip");
+        listLibFiles.add("myfunctions.py");
+        listLibFiles.add(PY4J_ZIP);
+        listLibFiles.add(PYSPARK_ZIP);
+        wf = getWorkflow(listLibFiles);
+        testPysparkHelper(sparkOpts, wf, "SUCCEEDED", 
WorkflowAction.Status.OK);
+    }
+
+    private void testPysparkHelper(String sparkOpts, WorkflowJobBean wf, 
String externalStatus,
+            WorkflowAction.Status wfStatus)
+            throws Exception {
+        Context context = createContext(getActionXml(sparkOpts), wf);
+        final RunningJob launcherJob = submitAction(context);
+        waitFor(200 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return launcherJob.isComplete();
+            }
+        });
+        assertTrue(launcherJob.isSuccessful());
+        SparkActionExecutor ae = new SparkActionExecutor();
+        ae.check(context, context.getAction());
+        assertEquals(externalStatus, context.getAction().getExternalStatus());
+        ae.end(context, context.getAction());
+        assertEquals(wfStatus, context.getAction().getStatus());
+    }
+
+    protected RunningJob submitAction(Context context) throws Exception {
+        SparkActionExecutor ae = new SparkActionExecutor();
+        WorkflowAction action = context.getAction();
+        ae.prepareActionDir(getFileSystem(), context);
+        ae.submitLauncher(getFileSystem(), context, action);
+        String jobId = action.getExternalId();
+        String jobTracker = action.getTrackerUri();
+        String consoleUrl = action.getConsoleUrl();
+        assertNotNull(jobId);
+        assertNotNull(jobTracker);
+        assertNotNull(consoleUrl);
+        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
+        jobConf.set("mapred.job.tracker", jobTracker);
+        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
+        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
+        assertNotNull(runningJob);
+        return runningJob;
+    }
+
+    protected Context createContext(String actionXml, WorkflowJobBean wf) 
throws Exception {
+        SparkActionExecutor ae = new SparkActionExecutor();
+        WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
+        action.setType(ae.getType());
+        action.setConf(actionXml);
+        return new Context(wf, action);
+    }
+
+    /**
+     * @param listLibFiles list of files to be created in workflow lib/
+     *        directory
+     * @return
+     * @throws Exception
+     */
+    protected WorkflowJobBean getWorkflow(ArrayList<String> listLibFiles) 
throws Exception {
+        // add the example file as well
+        listLibFiles.add(PI_EXAMPLE);
+        String[] libPaths = new String[listLibFiles.size()];
+        FileSystem fs = getFileSystem();
+        for (int i = 0; i < listLibFiles.size(); i++) {
+            libPaths[i] = new Path("lib/" + listLibFiles.get(i)).toString();
+            if (listLibFiles.get(i).equals(PY4J_ZIP) || 
listLibFiles.get(i).equals(PYSPARK_ZIP)
+                    || listLibFiles.get(i).equals(PI_EXAMPLE)) {
+                
IOUtils.copyStream(IOUtils.getResourceAsStream(listLibFiles.get(i), -1),
+                        fs.create(new Path(getAppPath(), "lib/" + 
listLibFiles.get(i))));
+            }
+            else {
+                fs.createNewFile(new Path(getAppPath(), "lib/" + 
listLibFiles.get(i)));
+            }
+        }
+        XConfiguration protoConf = new XConfiguration();
+        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+        SharelibUtils.addToDistributedCache("spark", getFileSystem(), 
getFsTestCaseDir(), protoConf);
+        WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action");
+        String defaultProtoConf = wf.getProtoActionConf();
+        XConfiguration newProtoConf = new XConfiguration(new 
StringReader(defaultProtoConf));
+        newProtoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, 
libPaths);
+        wf.setProtoActionConf(newProtoConf.toXmlString());
+        return wf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index dcd2360..8c77be0 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -59,7 +59,7 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
     private static final String SPARK_FILENAME = "file.txt";
     private static final String OUTPUT = "output";
     private static Pattern SPARK_OPTS_PATTERN = Pattern.compile("([^= ]+)=([^= 
]+)");
-
+    public static String SPARK_TESTING_MEMORY = 
"spark.testing.memory=512000000"; // 512MB
     @Override
     protected void setSystemProps() throws Exception {
         super.setSystemProps();
@@ -159,6 +159,7 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
                 "<jar>" + getAppPath() +"/lib/test.jar</jar>" +
                 "<arg>" + getAppPath() + "/" + SPARK_FILENAME + "</arg>" +
                 "<arg>" + getAppPath() + "/" + OUTPUT + "</arg>" +
+                "<spark-opts>--conf " +SPARK_TESTING_MEMORY+"</spark-opts>"+
                 "</spark>";
         return MessageFormat.format(script, getJobTrackerUri(), 
getNameNodeUri());
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/resources/pi.py
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/resources/pi.py 
b/sharelib/spark/src/test/resources/pi.py
new file mode 100644
index 0000000..e9836b2
--- /dev/null
+++ b/sharelib/spark/src/test/resources/pi.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from random import random
+from operator import add
+
+from pyspark import SparkContext
+
+
+if __name__ == "__main__":
+    """
+        Usage: pi [partitions]
+    """
+    sc = SparkContext(appName="PythonPi")
+    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
+    n = 100000 * partitions
+
+    def f(_):
+        x = random() * 2 - 1
+        y = random() * 2 - 1
+        return 1 if x ** 2 + y ** 2 < 1 else 0
+
+    count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+    print("Pi is roughly %f" % (4.0 * count / n))
+
+    sc.stop()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/resources/py4j-0.9-src.zip
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/resources/py4j-0.9-src.zip 
b/sharelib/spark/src/test/resources/py4j-0.9-src.zip
new file mode 100644
index 0000000..dace2d0
Binary files /dev/null and b/sharelib/spark/src/test/resources/py4j-0.9-src.zip 
differ

http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/resources/pyspark.zip
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/resources/pyspark.zip 
b/sharelib/spark/src/test/resources/pyspark.zip
new file mode 100644
index 0000000..9ff8bd8
Binary files /dev/null and b/sharelib/spark/src/test/resources/pyspark.zip 
differ

Reply via email to