Repository: oozie Updated Branches: refs/heads/master c0b5497b2 -> b0ebf58e8
OOZIE-2482 Pyspark job fails with Oozie (satishsaley and gezapeti via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b0ebf58e Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b0ebf58e Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b0ebf58e Branch: refs/heads/master Commit: b0ebf58e86f26f01571f558bf3b04d1788b8380f Parents: c0b5497 Author: Robert Kanter <[email protected]> Authored: Thu May 26 16:49:33 2016 -0700 Committer: Robert Kanter <[email protected]> Committed: Thu May 26 16:54:06 2016 -0700 ---------------------------------------------------------------------- .../action/hadoop/SparkActionExecutor.java | 21 +++ docs/src/site/twiki/AG_Install.twiki | 5 + .../site/twiki/DG_SparkActionExtension.twiki | 41 +++++ pom.xml | 2 +- release-log.txt | 1 + sharelib/spark/pom.xml | 101 ++++++++++ .../apache/oozie/action/hadoop/SparkMain.java | 53 +++++- .../apache/oozie/action/hadoop/TestPyspark.java | 182 +++++++++++++++++++ .../action/hadoop/TestSparkActionExecutor.java | 3 +- sharelib/spark/src/test/resources/pi.py | 41 +++++ .../spark/src/test/resources/py4j-0.9-src.zip | Bin 0 -> 44846 bytes sharelib/spark/src/test/resources/pyspark.zip | Bin 0 -> 357051 bytes 12 files changed, 447 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index 219a116..6d37105 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -43,6 +43,7 @@ public class SparkActionExecutor extends JavaActionExecutor { public static final String SPARK_JOB_NAME = "oozie.spark.name"; public static final String SPARK_CLASS = "oozie.spark.class"; public static final String SPARK_JAR = "oozie.spark.jar"; + public static final String MAPRED_CHILD_ENV = "mapred.child.env"; public SparkActionExecutor() { super("spark"); @@ -107,6 +108,26 @@ public class SparkActionExecutor extends JavaActionExecutor { } @Override + Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) + throws ActionExecutorException { + super.setupLauncherConf(conf, actionXml, appPath, context); + + // Set SPARK_HOME environment variable on launcher job + // It is needed since pyspark client checks for it. + String sparkHome = "SPARK_HOME=."; + String mapredChildEnv = conf.get("oozie.launcher." + MAPRED_CHILD_ENV); + + if (mapredChildEnv == null) { + conf.set(MAPRED_CHILD_ENV, sparkHome); + conf.set("oozie.launcher." + MAPRED_CHILD_ENV, sparkHome); + } else if (!mapredChildEnv.contains("SPARK_HOME")) { + conf.set(MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome); + conf.set("oozie.launcher." + MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome); + } + return conf; + } + + @Override public List<Class> getLauncherClasses() { List<Class> classes = new ArrayList<Class>(); try { http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/docs/src/site/twiki/AG_Install.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/AG_Install.twiki b/docs/src/site/twiki/AG_Install.twiki index 66c0019..2d36737 100644 --- a/docs/src/site/twiki/AG_Install.twiki +++ b/docs/src/site/twiki/AG_Install.twiki @@ -1058,10 +1058,15 @@ action and value is a comma separated list of DFS directories or jar files. oozie.pig_10=hdfs:///share/lib/pig/pig-0.10.1/lib/ oozie.pig=hdfs:///share/lib/pig/pig-0.11.1/lib/ oozie.distcp=hdfs:///share/lib/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-distcp-2.2.0.jar + oozie.spark=hdfs:///share/lib/spark/lib/,hdfs:///share/lib/spark/python/lib/pyspark.zip,hdfs:///share/lib/spark/python/lib/py4j-0-9-src.zip </description> </property> </verbatim> +Oozie sharelib TAR.GZ file bundled with the distribution does not contain pyspark and py4j zip files since they vary +with Apache Spark version. Therefore, to run pySpark using Spark Action, user need to specify pyspark and py4j zip +files. These files can be added either to workflow's lib/ directory, to the sharelib or in sharelib mapping file. + ---++ Oozie Coordinators/Bundles Processing Timezone http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index 18b84b7..d7d75a1 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -159,6 +159,47 @@ either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationSe 3. spark.eventLog.enabled=true +---+++ PySpark with Spark Action + +To submit PySpark scripts with Spark Action, pyspark dependencies must be available in sharelib or in workflow's lib/ directory. +For more information, please refer to [[AG_Install#Oozie_Share_Lib][installation document.]] + +*Example:* + +<verbatim> +<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1"> + .... + <action name="myfirstpysparkjob"> + <spark xmlns="uri:oozie:spark-action:0.1"> + <job-tracker>foo:8021</job-tracker> + <name-node>bar:8020</name-node> + <prepare> + <delete path="${jobOutput}"/> + </prepare> + <configuration> + <property> + <name>mapred.compress.map.output</name> + <value>true</value> + </property> + </configuration> + <master>yarn-cluster</master> + <name>Spark Example</name> + <jar>pi.py</jar> + <spark-opts>--executor-memory 20G --num-executors 50 + --conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts> + <arg>100</arg> + </spark> + <ok to="myotherjob"/> + <error to="errorcleanup"/> + </action> + ... +</workflow-app> +</verbatim> + +The =jar= element indicates python file. Refer to the file by it's localized name, because only local files are allowed +in PySpark. The py file should be in the lib/ folder next to the workflow.xml or added using the =file= element so that +it's localized to the working directory with just its name. + ---++ Appendix, Spark XML-Schema ---+++ AE.A Appendix A, Spark XML-Schema http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 26f10a3..5b7a863 100644 --- a/pom.xml +++ b/pom.xml @@ -91,7 +91,7 @@ <pig.version>0.12.1</pig.version> <pig.classifier></pig.classifier> <sqoop.version>1.4.3</sqoop.version> - <spark.version>1.1.0</spark.version> + <spark.version>1.6.1</spark.version> <spark.guava.version>14.0.1</spark.guava.version> <sqoop.classifier>hadoop100</sqoop.classifier> <streaming.version>${hadoop.version}</streaming.version> http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 7f40a99..fb2cbdb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2482 Pyspark job fails with Oozie (satishsaley and gezapeti via rkanter) OOZIE-2467 Oozie can shutdown itself on long GC pause (puru) OOZIE-2537 SqoopMain does not set up log4j properly (pbacsko via rkanter) OOZIE-2532 patch apply does not handle binary files (gezapeti via rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml index 72766dc..748ae06 100644 --- a/sharelib/spark/pom.xml +++ b/sharelib/spark/pom.xml @@ -50,6 +50,11 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.apache.oozie</groupId> <artifactId>oozie-sharelib-oozie</artifactId> <scope>provided</scope> @@ -64,6 +69,38 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-beeline</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-jdbc</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -98,6 +135,38 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-annotations</artifactId> </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-beeline</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-jdbc</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -318,6 +387,38 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-beeline</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-jdbc</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> </exclusions> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index a09ecfb..604f287 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -18,14 +18,17 @@ package org.apache.oozie.action.hadoop; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.spark.deploy.SparkSubmit; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.regex.Pattern; public class SparkMain extends LauncherMain { private static final String MASTER_OPTION = "--master"; @@ -40,6 +43,8 @@ public class SparkMain extends LauncherMain { private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled"; private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled"; + private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"), + Pattern.compile("pyspark.zip") }; private String sparkJars = null; private String sparkClasspath = null; @@ -49,6 +54,7 @@ public class SparkMain extends LauncherMain { @Override protected void run(String[] args) throws Exception { + boolean isPyspark = false; Configuration actionConf = loadActionConf(); setYarnTag(actionConf); LauncherMainHadoopUtils.killChildYarnJobs(actionConf); @@ -79,6 +85,9 @@ public class SparkMain extends LauncherMain { } String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); + if(jarPath!=null && jarPath.endsWith(".py")){ + isPyspark = true; + } // In local mode, everything runs here in the Launcher Job. // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn. @@ -179,10 +188,12 @@ public class SparkMain extends LauncherMain { } sparkArgs.add(jarPath); - for (String arg : args) { sparkArgs.add(arg); } + if (isPyspark){ + createPySparkLibFolder(); + } System.out.println("Spark Action Main class : " + SparkSubmit.class.getName()); System.out.println(); @@ -196,6 +207,46 @@ public class SparkMain extends LauncherMain { runSpark(sparkArgs.toArray(new String[sparkArgs.size()])); } + /** + * SparkActionExecutor sets the SPARK_HOME environment variable to the local directory. + * Spark is looking for the pyspark.zip and py4j-VERSION-src.zip files in the python/lib folder under SPARK_HOME. + * This function creates the subfolders and copies the zips from the local folder. + * @throws OozieActionConfiguratorException if the zip files are missing + * @throws IOException if there is an error during file copy + */ + private void createPySparkLibFolder() throws OozieActionConfiguratorException, IOException { + File pythonLibDir = new File("python/lib"); + if(!pythonLibDir.exists()){ + pythonLibDir.mkdirs(); + System.out.println("PySpark lib folder " + pythonLibDir.getAbsolutePath() + " folder created."); + } + + for(Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) { + File file = getMatchingFile(fileNamePattern); + File destination = new File(pythonLibDir, file.getName()); + FileUtils.copyFile(file, destination); + System.out.println("Copied " + file + " to " + destination.getAbsolutePath()); + } + } + + /** + * Searches for a file in the current directory that matches the given pattern. + * If there are multiple files matching the pattern returns one of them. + * @param fileNamePattern the pattern to look for + * @return the file if there is one + * @throws OozieActionConfiguratorException if there is are no files matching the pattern + */ + private File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { + File localDir = new File("."); + for(String fileName : localDir.list()){ + if(fileNamePattern.matcher(fileName).find()){ + return new File(fileName); + } + } + throw new OozieActionConfiguratorException("Missing py4j and/or pyspark zip files. Please add them to " + + "the lib folder or to the Spark sharelib."); + } + private void runSpark(String[] args) throws Exception { System.out.println("================================================================="); System.out.println(); http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java new file mode 100644 index 0000000..458baaa --- /dev/null +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.StringReader; +import java.text.MessageFormat; +import java.util.ArrayList; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; + +public class TestPyspark extends ActionExecutorTestCase { + + + public static String PY4J_ZIP = "py4j-0.9-src.zip"; + public static String PYSPARK_ZIP = "pyspark.zip"; + public static String PI_EXAMPLE = "pi.py"; + + + @Override + protected void setSystemProps() throws Exception { + super.setSystemProps(); + setSystemProperty("oozie.service.ActionService.executor.classes", SparkActionExecutor.class.getName()); + } + + protected String getActionXml(String sparkOpts) { + String script = "<spark xmlns=''uri:oozie:spark-action:0.1''>" + + "<job-tracker>{0}</job-tracker>" + + "<name-node>{1}</name-node>" + + "<master>local[*]</master>" + + "<mode>client</mode>" + + "<name>PysparkExample</name>" + + "<jar>" + PI_EXAMPLE + "</jar>" + + "<spark-opts>" +sparkOpts +"</spark-opts>" + + "</spark>"; + return MessageFormat.format(script, getJobTrackerUri(), getNameNodeUri()); + } + + public void testPyspark() throws Exception { + ArrayList<String> listLibFiles = new ArrayList<String>(); + + // <spark-opts> does not have any files + // pyspark and py4j are not present in current directory. + String sparkOpts = "--conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + WorkflowJobBean wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "FAILED/KILLED", WorkflowAction.Status.ERROR); + + // <spark-opts> has other files; + // pyspark and py4j are not present in current directory. + sparkOpts = "--py-files other.zip,myfunctions.py --conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + listLibFiles.add("other.zip"); + listLibFiles.add("myfunctions.py"); + wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "FAILED/KILLED", WorkflowAction.Status.ERROR); + + // <spark-opts> does not have any files + // pyspark and py4j are present in current directory. + sparkOpts = "--conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + listLibFiles.clear(); + listLibFiles.add(PY4J_ZIP); + listLibFiles.add(PYSPARK_ZIP); + wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "SUCCEEDED", WorkflowAction.Status.OK); + + // <spark-opts> has some other files + // pyspark and py4j are present in current directory. + sparkOpts = "--py-files other.zip,myfunctions.py --conf " + TestSparkActionExecutor.SPARK_TESTING_MEMORY; + listLibFiles.clear(); + listLibFiles.add("other.zip"); + listLibFiles.add("myfunctions.py"); + listLibFiles.add(PY4J_ZIP); + listLibFiles.add(PYSPARK_ZIP); + wf = getWorkflow(listLibFiles); + testPysparkHelper(sparkOpts, wf, "SUCCEEDED", WorkflowAction.Status.OK); + } + + private void testPysparkHelper(String sparkOpts, WorkflowJobBean wf, String externalStatus, + WorkflowAction.Status wfStatus) + throws Exception { + Context context = createContext(getActionXml(sparkOpts), wf); + final RunningJob launcherJob = submitAction(context); + waitFor(200 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return launcherJob.isComplete(); + } + }); + assertTrue(launcherJob.isSuccessful()); + SparkActionExecutor ae = new SparkActionExecutor(); + ae.check(context, context.getAction()); + assertEquals(externalStatus, context.getAction().getExternalStatus()); + ae.end(context, context.getAction()); + assertEquals(wfStatus, context.getAction().getStatus()); + } + + protected RunningJob submitAction(Context context) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + WorkflowAction action = context.getAction(); + ae.prepareActionDir(getFileSystem(), context); + ae.submitLauncher(getFileSystem(), context, action); + String jobId = action.getExternalId(); + String jobTracker = action.getTrackerUri(); + String consoleUrl = action.getConsoleUrl(); + assertNotNull(jobId); + assertNotNull(jobTracker); + assertNotNull(consoleUrl); + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); + jobConf.set("mapred.job.tracker", jobTracker); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); + final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); + assertNotNull(runningJob); + return runningJob; + } + + protected Context createContext(String actionXml, WorkflowJobBean wf) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + action.setConf(actionXml); + return new Context(wf, action); + } + + /** + * @param listLibFiles list of files to be created in workflow lib/ + * directory + * @return + * @throws Exception + */ + protected WorkflowJobBean getWorkflow(ArrayList<String> listLibFiles) throws Exception { + // add the example file as well + listLibFiles.add(PI_EXAMPLE); + String[] libPaths = new String[listLibFiles.size()]; + FileSystem fs = getFileSystem(); + for (int i = 0; i < listLibFiles.size(); i++) { + libPaths[i] = new Path("lib/" + listLibFiles.get(i)).toString(); + if (listLibFiles.get(i).equals(PY4J_ZIP) || listLibFiles.get(i).equals(PYSPARK_ZIP) + || listLibFiles.get(i).equals(PI_EXAMPLE)) { + IOUtils.copyStream(IOUtils.getResourceAsStream(listLibFiles.get(i), -1), + fs.create(new Path(getAppPath(), "lib/" + listLibFiles.get(i)))); + } + else { + fs.createNewFile(new Path(getAppPath(), "lib/" + listLibFiles.get(i))); + } + } + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + SharelibUtils.addToDistributedCache("spark", getFileSystem(), getFsTestCaseDir(), protoConf); + WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action"); + String defaultProtoConf = wf.getProtoActionConf(); + XConfiguration newProtoConf = new XConfiguration(new StringReader(defaultProtoConf)); + newProtoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, libPaths); + wf.setProtoActionConf(newProtoConf.toXmlString()); + return wf; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java index dcd2360..8c77be0 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -59,7 +59,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { private static final String SPARK_FILENAME = "file.txt"; private static final String OUTPUT = "output"; private static Pattern SPARK_OPTS_PATTERN = Pattern.compile("([^= ]+)=([^= ]+)"); - + public static String SPARK_TESTING_MEMORY = "spark.testing.memory=512000000"; // 512MB @Override protected void setSystemProps() throws Exception { super.setSystemProps(); @@ -159,6 +159,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { "<jar>" + getAppPath() +"/lib/test.jar</jar>" + "<arg>" + getAppPath() + "/" + SPARK_FILENAME + "</arg>" + "<arg>" + getAppPath() + "/" + OUTPUT + "</arg>" + + "<spark-opts>--conf " +SPARK_TESTING_MEMORY+"</spark-opts>"+ "</spark>"; return MessageFormat.format(script, getJobTrackerUri(), getNameNodeUri()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/resources/pi.py ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/resources/pi.py b/sharelib/spark/src/test/resources/pi.py new file mode 100644 index 0000000..e9836b2 --- /dev/null +++ b/sharelib/spark/src/test/resources/pi.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +from random import random +from operator import add + +from pyspark import SparkContext + + +if __name__ == "__main__": + """ + Usage: pi [partitions] + """ + sc = SparkContext(appName="PythonPi") + partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 + n = 100000 * partitions + + def f(_): + x = random() * 2 - 1 + y = random() * 2 - 1 + return 1 if x ** 2 + y ** 2 < 1 else 0 + + count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add) + print("Pi is roughly %f" % (4.0 * count / n)) + + sc.stop() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/resources/py4j-0.9-src.zip ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/resources/py4j-0.9-src.zip b/sharelib/spark/src/test/resources/py4j-0.9-src.zip new file mode 100644 index 0000000..dace2d0 Binary files /dev/null and b/sharelib/spark/src/test/resources/py4j-0.9-src.zip differ http://git-wip-us.apache.org/repos/asf/oozie/blob/b0ebf58e/sharelib/spark/src/test/resources/pyspark.zip ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/resources/pyspark.zip b/sharelib/spark/src/test/resources/pyspark.zip new file mode 100644 index 0000000..9ff8bd8 Binary files /dev/null and b/sharelib/spark/src/test/resources/pyspark.zip differ
