Repository: oozie Updated Branches: refs/heads/master 01cb4d555 -> 8814ac44e
OOZIE-1983 Add spark action executor (pavan kumar via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8814ac44 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8814ac44 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8814ac44 Branch: refs/heads/master Commit: 8814ac44ea7afd6711059db611404787aefcbb85 Parents: 01cb4d5 Author: Robert Kanter <[email protected]> Authored: Mon Nov 24 17:13:55 2014 -0800 Committer: Robert Kanter <[email protected]> Committed: Mon Nov 24 17:13:55 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/oozie/cli/OozieCLI.java | 2 + client/src/main/resources/spark-action-0.1.xsd | 71 ++++ .../action/hadoop/SparkActionExecutor.java | 119 +++++++ core/src/main/resources/oozie-default.xml | 7 +- .../site/twiki/DG_SparkActionExtension.twiki | 206 ++++++++++++ docs/src/site/twiki/index.twiki | 1 + pom.xml | 7 + .../apache/oozie/action/hadoop/JavaMain.java | 21 -- .../oozie/action/hadoop/LauncherMain.java | 26 ++ sharelib/pom.xml | 1 + sharelib/spark/pom.xml | 330 +++++++++++++++++++ .../SparkMain.java | 107 ++++++ .../action/hadoop/TestSparkActionExecutor.java | 155 +++++++++ .../oozie/action/hadoop/TestSparkMain.java | 85 +++++ src/main/assemblies/sharelib.xml | 4 + webapp/pom.xml | 9 +- 16 files changed, 1126 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/client/src/main/java/org/apache/oozie/cli/OozieCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java index 9c2d14b..1265fad 100644 --- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java +++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java @@ -1869,6 +1869,8 @@ public class OozieCLI { "ssh-action-0.2.xsd"))); sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream( "hive2-action-0.1.xsd"))); + sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream( + "spark-action-0.1.xsd"))); SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); Schema schema = factory.newSchema(sources.toArray(new StreamSource[sources.size()])); Validator validator = schema.newValidator(); http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/client/src/main/resources/spark-action-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/spark-action-0.1.xsd b/client/src/main/resources/spark-action-0.1.xsd new file mode 100644 index 0000000..eb3790a --- /dev/null +++ b/client/src/main/resources/spark-action-0.1.xsd @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified" + targetNamespace="uri:oozie:spark-action:0.1"> + + <xs:element name="spark" type="spark:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="CONFIGURATION"> + <xs:sequence> + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="PREPARE"> + <xs:sequence> + <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="DELETE"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="MKDIR"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + +</xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java new file mode 100644 index 0000000..65f8d5a --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.oozie.action.ActionExecutorException; +import org.apache.oozie.client.WorkflowAction; +import org.jdom.Element; +import org.jdom.Namespace; + +import java.util.ArrayList; +import java.util.List; + +public class SparkActionExecutor extends JavaActionExecutor { + public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain"; + public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; + public static final String SPARK_MASTER = "oozie.spark.master"; + public static final String SPARK_MODE = "oozie.spark.mode"; + public static final String SPARK_OPTS = "oozie.spark.spark-opts"; + public static final String SPARK_JOB_NAME = "oozie.spark.name"; + public static final String SPARK_CLASS = "oozie.spark.class"; + public static final String SPARK_JAR = "oozie.spark.jar"; + + public SparkActionExecutor() { + super("spark"); + } + + @Override + Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) + throws ActionExecutorException { + actionConf = super.setupActionConf(actionConf, context, actionXml, appPath); + Namespace ns = actionXml.getNamespace(); + + String master = actionXml.getChildTextTrim("master", ns); + actionConf.set(SPARK_MASTER, master); + + String mode = actionXml.getChildTextTrim("mode", ns); + if (mode != null) { + actionConf.set(SPARK_MODE, mode); + } + + String jobName = actionXml.getChildTextTrim("name", ns); + actionConf.set(SPARK_JOB_NAME, jobName); + + String sparkClass = actionXml.getChildTextTrim("class", ns); + if (sparkClass != null) { + actionConf.set(SPARK_CLASS, sparkClass); + } + + String jarLocation = actionXml.getChildTextTrim("jar", ns); + actionConf.set(SPARK_JAR, jarLocation); + + String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns); + if (sparkOpts != null) { + actionConf.set(SPARK_OPTS, sparkOpts); + } + + return actionConf; + } + + @Override + JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, + Configuration actionConf) throws ActionExecutorException { + + JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf); + if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) { + launcherJobConf.set(TASK_USER_PRECEDENCE, "true"); + } + return launcherJobConf; + } + + @Override + public List<Class> getLauncherClasses() { + List<Class> classes = new ArrayList<Class>(); + try { + classes.add(Class.forName(SPARK_MAIN_CLASS_NAME)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Class not found", e); + } + return classes; + } + + + /** + * Return the sharelib name for the action. + * + * @param actionXml + * @return returns <code>spark</code>. + */ + @Override + protected String getDefaultShareLibName(Element actionXml) { + return "spark"; + } + + @Override + protected String getLauncherMain(Configuration launcherConf, Element actionXml) { + return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 19cae9d..6a768b3 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1423,7 +1423,7 @@ shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd,email-action-0.1.xsd,email-action-0.2.xsd, hive-action-0.2.xsd,hive-action-0.3.xsd,hive-action-0.4.xsd,hive-action-0.5.xsd,sqoop-action-0.2.xsd, sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,ssh-action-0.1.xsd,ssh-action-0.2.xsd,distcp-action-0.1.xsd, - distcp-action-0.2.xsd,oozie-sla-0.1.xsd,oozie-sla-0.2.xsd,hive2-action-0.1.xsd + distcp-action-0.2.xsd,oozie-sla-0.1.xsd,oozie-sla-0.2.xsd,hive2-action-0.1.xsd,spark-action-0.1.xsd </value> <description> Schemas for additional actions types. @@ -1512,7 +1512,8 @@ org.apache.oozie.action.hadoop.Hive2ActionExecutor, org.apache.oozie.action.ssh.SshActionExecutor, org.apache.oozie.action.oozie.SubWorkflowActionExecutor, - org.apache.oozie.action.email.EmailActionExecutor + org.apache.oozie.action.email.EmailActionExecutor, + org.apache.oozie.action.hadoop.SparkActionExecutor </value> <description> List of ActionExecutors classes (separated by commas). @@ -1793,7 +1794,7 @@ </description> </property> - <property> + <property> <name>oozie.actions.main.classnames</name> <value>distcp=org.apache.hadoop.tools.DistCp</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki new file mode 100644 index 0000000..8e5cd9b --- /dev/null +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -0,0 +1,206 @@ +<noautolink> + +[[index][::Go back to Oozie Documentation Index::]] + +----- + +---+!! Oozie Spark Action Extension + +%TOC% + +---++ Spark Action + +The =spark= action runs a Spark job. + +The workflow job will wait until the Spark job completes before +continuing to the next action. + +To run the Spark job, you have to configure the =spark= action with the +=job-tracker=, =name-node=, Spark =master= elements as +well as the necessary elements, arguments and configuration. + +Spark options can be specified in an element called =spark-opts= + +A =spark= action can be configured to create or delete HDFS directories +before starting the Spark job. + +Oozie EL expressions can be used in the inline configuration. Property +values specified in the =configuration= element override values specified +in the =job-xml= file. + +*Syntax:* + +<verbatim> +<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3"> + ... + <action name="[NODE-NAME]"> + <spark xmlns="uri:oozie:spark-action:0.1"> + <job-tracker>[JOB-TRACKER]</job-tracker> + <name-node>[NAME-NODE]</name-node> + <prepare> + <delete path="[PATH]"/> + ... + <mkdir path="[PATH]"/> + ... + </prepare> + <job-xml>[SPARK SETTINGS FILE]</job-xml> + <configuration> + <property> + <name>[PROPERTY-NAME]</name> + <value>[PROPERTY-VALUE]</value> + </property> + ... + </configuration> + <master>[SPARK MASTER URL]</master> + <mode>[SPARK MODE]</mode> + <name>[SPARK JOB NAME]</name> + <class>[SPARK MAIN CLASS]</class> + <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar> + <spark-opts>[SPARK-OPTIONS]</spark-opts> + <arg>[ARG-VALUE]</arg> + ... + <arg>[ARG-VALUE]</arg> + ... + </spark> + <ok to="[NODE-NAME]"/> + <error to="[NODE-NAME]"/> + </action> + ... +</workflow-app> +</verbatim> + +The =prepare= element, if present, indicates a list of paths to delete +or create before starting the job. Specified paths must start with =hdfs://HOST:PORT=. + +The =job-xml= element, if present, specifies a file containing configuration +for the Spark job. Multiple =job-xml= elements are allowed in order to +specify multiple =job.xml= files. + +The =configuration= element, if present, contains configuration +properties that are passed to the Spark job. + +The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn, or local. + +The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. + +The =name= element indicates the name of the spark application. + +The =class= element if present, indicates the spark's application main class. + +The =jar= element indicates a comma separated list of jars or python files. + +The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver. + +The =arg= element if present, contains arguments that can be passed to spark application. + +All the above elements can be parameterized (templatized) using EL +expressions. + +*Example:* + +<verbatim> +<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1"> + ... + <action name="myfirstsparkjob"> + <spark xmlns="uri:oozie:spark-action:0.1"> + <job-tracker>foo:8021</job-tracker> + <name-node>bar:8020</name-node> + <prepare> + <delete path="${jobOutput}"/> + </prepare> + <configuration> + <property> + <name>mapred.compress.map.output</name> + <value>true</value> + </property> + </configuration> + <master>local[*]</master> + <mode>client<mode> + <name>Spark Example</name> + <class>org.apache.spark.examples.mllib.JavaALS</class> + <jar>/lib/spark-examples_2.10-1.1.0.jar</jar> + <spark-opts>--executor-memory 20G --num-executors 50</spark-opts> + <arg>inputpath=hdfs://localhost/input/file.txt</arg> + <arg>value=2</arg> + </spark> + <ok to="myotherjob"/> + <error to="errorcleanup"/> + </action> + ... +</workflow-app> +</verbatim> + +---+++ Spark Action Logging + +Spark action logs are redirected to the Oozie Launcher map-reduce job task STDOUT/STDERR that runs Spark. + +From Oozie web-console, from the Spark action pop up using the 'Console URL' link, it is possible +to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tracker web-console. + +---++ Appendix, Spark XML-Schema + +---+++ AE.A Appendix A, Spark XML-Schema + +---++++ Spark Action Schema Version 0.1 +<verbatim> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified" + targetNamespace="uri:oozie:spark-action:0.1"> + + <xs:element name="spark" type="spark:ACTION"/> + + <xs:complexType name="ACTION"> + <xs:sequence> + <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/> + <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="CONFIGURATION"> + <xs:sequence> + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="PREPARE"> + <xs:sequence> + <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="DELETE"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="MKDIR"> + <xs:attribute name="path" type="xs:string" use="required"/> + </xs:complexType> + +</xs:schema> +</verbatim> + +[[index][::Go back to Oozie Documentation Index::]] + +</noautolink> + + + http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/docs/src/site/twiki/index.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/index.twiki b/docs/src/site/twiki/index.twiki index c8ba742..50c6213 100644 --- a/docs/src/site/twiki/index.twiki +++ b/docs/src/site/twiki/index.twiki @@ -60,6 +60,7 @@ Enough reading already? Follow the steps in [[DG_QuickStart][Oozie Quick Start]] * [[DG_SqoopActionExtension][Sqoop Action]] * [[DG_SshActionExtension][Ssh Action]] * [[DG_DistCpActionExtension][DistCp Action]] + * [[DG_SparkActionExtension][Spark Action]] * [[DG_CustomActionExecutor][Writing a Custom Action Executor]] ---+++ Job Status and SLA Monitoring http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1e79186..5507381 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,8 @@ <pig.version>0.12.1</pig.version> <pig.classifier></pig.classifier> <sqoop.version>1.4.3</sqoop.version> + <spark.version>1.1.0</spark.version> + <spark.guava.version>14.0.1</spark.guava.version> <sqoop.classifier>hadoop100</sqoop.classifier> <streaming.version>${hadoop.version}</streaming.version> <distcp.version>${hadooplib.version}</distcp.version> @@ -239,6 +241,11 @@ </dependency> <dependency> <groupId>org.apache.oozie</groupId> + <artifactId>oozie-sharelib-spark</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> <artifactId>oozie-docs</artifactId> <version>${project.version}</version> <type>war</type> http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java index 8b8135a..f58ff1d 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java @@ -61,26 +61,5 @@ public class JavaMain extends LauncherMain { } } - /** - * Read action configuration passes through action xml file. - * - * @return action Configuration - * @throws IOException - */ - protected Configuration loadActionConf() throws IOException { - // loading action conf prepared by Oozie - Configuration actionConf = new Configuration(false); - String actionXml = System.getProperty("oozie.action.conf.xml"); - - if (actionXml == null) { - throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); - } - if (!new File(actionXml).exists()) { - throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); - } - - actionConf.addResource(new Path("file:///", actionXml)); - return actionConf; - } } http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java index 8cfefbb..ae983cd 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java @@ -28,6 +28,9 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Shell; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.mapred.JobConf; @@ -146,6 +149,29 @@ public abstract class LauncherMain { } } } + + /** + * Read action configuration passes through action xml file. + * + * @return action Configuration + * @throws IOException + */ + protected Configuration loadActionConf() throws IOException { + // loading action conf prepared by Oozie + Configuration actionConf = new Configuration(false); + + String actionXml = System.getProperty("oozie.action.conf.xml"); + + if (actionXml == null) { + throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); + } + if (!new File(actionXml).exists()) { + throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); + } + + actionConf.addResource(new Path("file:///", actionXml)); + return actionConf; + } } class LauncherMainException extends Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/sharelib/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/pom.xml b/sharelib/pom.xml index aa479a8..88e9539 100644 --- a/sharelib/pom.xml +++ b/sharelib/pom.xml @@ -40,6 +40,7 @@ <module>sqoop</module> <module>oozie</module> <module>distcp</module> + <module>spark</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml new file mode 100644 index 0000000..744c95c --- /dev/null +++ b/sharelib/spark/pom.xml @@ -0,0 +1,330 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-main</artifactId> + <version>4.2.0-SNAPSHOT</version> + <relativePath>../..</relativePath> + </parent> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-sharelib-spark</artifactId> + <version>4.2.0-SNAPSHOT</version> + <description>Apache Oozie Share Lib Spark</description> + <name>Apache Oozie Share Lib Spark</name> + <packaging>jar</packaging> + + <properties> + <sharelib.action.postfix>spark</sharelib.action.postfix> + <sharelib.transitive.filtering>false</sharelib.transitive.filtering> + </properties> + + <dependencies> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${spark.guava.version}</version> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-examples_2.10</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-graphx_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-repl_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-core</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>guava</artifactId> + <groupId>com.google.guava</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-core-asl</artifactId> + <groupId>org.codehaus.jackson</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-mapper-asl</artifactId> + <groupId>org.codehaus.jackson</groupId> + </exclusion> + <exclusion> + <artifactId>jetty</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>jetty-util</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>servlet-api-2.5</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + <exclusion> + <artifactId>hive-common</artifactId> + <groupId>org.apache.hive</groupId> + </exclusion> + <exclusion> + <artifactId>derby</artifactId> + <groupId>org.apache.derby</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-hadoop</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>guava</artifactId> + <groupId>com.google.guava</groupId> + </exclusion> + <exclusion> + <artifactId>avro</artifactId> + <groupId>org.apache.avro</groupId> + </exclusion> + <exclusion> + <artifactId>commons-compress</artifactId> + <groupId>org.apache.commons</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-core-asl</artifactId> + <groupId>org.codehaus.jackson</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-mapper-asl</artifactId> + <groupId>org.codehaus.jackson</groupId> + </exclusion> + <exclusion> + <artifactId>jetty-util</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>jsr305</artifactId> + <groupId>com.google.code.findbugs</groupId> + </exclusion> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-hadoop-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-hadoop-utils</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-hcatalog</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>gen-classpath</id> + <phase>generate-test-resources</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <includeScope>compile</includeScope> + <outputFile>${project.build.directory}/classpath</outputFile> + </configuration> + </execution> + <execution> + <id>create-mrapp-generated-classpath</id> + <phase>generate-test-resources</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <!-- needed to run the unit test for DS to generate the required classpath + that is required in the env of the launch container in the mini mr/yarn cluster --> + <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.6</version> + <executions> + <execution> + <configuration> + <target> + <!-- needed to include Main class in classpath for mini yarn cluster for unit tests --> + <echo file="${project.build.directory}/test-classes/mrapp-generated-classpath" + append="true" message=":${project.build.directory}/classes"/> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + <phase>generate-test-resources</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <finalName>partial-sharelib</finalName> + <appendAssemblyId>false</appendAssemblyId> + <descriptors> + <descriptor>../../src/main/assemblies/partial-sharelib.xml</descriptor> + </descriptors> + </configuration> + </plugin> + </plugins> + </build> + +</project> + http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java new file mode 100644 index 0000000..dcf3868 --- /dev/null +++ b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.deploy.SparkSubmit; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +public class SparkMain extends LauncherMain { + private static final String MASTER_OPTION = "--master"; + private static final String MODE_OPTION = "--deploy-mode"; + private static final String JOB_NAME_OPTION = "--name"; + private static final String CLASS_NAME_OPTION = "--class"; + private static final String VERBOSE_OPTION = "--verbose"; + private static final String DELIM = " "; + + + public static void main(String[] args) throws Exception { + run(SparkMain.class, args); + } + + @Override + protected void run(String[] args) throws Exception { + Configuration actionConf = loadActionConf(); + LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + + List<String> sparkArgs = new ArrayList<String>(); + + sparkArgs.add(MASTER_OPTION); + sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_MASTER)); + + String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); + if (sparkDeployMode != null) { + sparkArgs.add(MODE_OPTION); + sparkArgs.add(sparkDeployMode); + } + + sparkArgs.add(JOB_NAME_OPTION); + sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME)); + + String className = actionConf.get(SparkActionExecutor.SPARK_CLASS); + if (className != null) { + sparkArgs.add(CLASS_NAME_OPTION); + sparkArgs.add(className); + } + + String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); + if (StringUtils.isNotEmpty(sparkOpts)) { + String[] sparkOptions = sparkOpts.split(DELIM); + for (String opt : sparkOptions) { + sparkArgs.add(opt); + } + } + + if (!sparkArgs.contains(VERBOSE_OPTION)) { + sparkArgs.add(VERBOSE_OPTION); + } + + String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); + sparkArgs.add(jarPath); + + for (String arg : args) { + sparkArgs.add(arg); + } + + System.out.println("Spark Action Main class : " + SparkSubmit.class.getName()); + System.out.println(); + System.out.println("Oozie Spark action configuration"); + System.out.println("================================================================="); + System.out.println(); + for (String arg : sparkArgs) { + System.out.println(" " + arg); + } + System.out.println(); + runSpark(sparkArgs.toArray(new String[sparkArgs.size()])); + } + + private void runSpark(String[] args) throws Exception { + System.out.println("================================================================="); + System.out.println(); + System.out.println(">>> Invoking Spark class now >>>"); + System.out.println(); + System.out.flush(); + SparkSubmit.main(args); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java new file mode 100644 index 0000000..b83be47 --- /dev/null +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; + +import java.io.OutputStreamWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Writer; + +import java.text.MessageFormat; +import java.util.Arrays; + +public class TestSparkActionExecutor extends ActionExecutorTestCase { + private static final String SPARK_FILENAME = "file.txt"; + private static final String OUTPUT = "output"; + + @Override + protected void setSystemProps() throws Exception { + super.setSystemProps(); + setSystemProperty("oozie.service.ActionService.executor.classes", SparkActionExecutor.class.getName()); + } + + @SuppressWarnings("unchecked") + public void testSetupMethods() throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses()); + } + + private String getActionXml() { + String script = "<spark xmlns=''uri:oozie:spark-action:0.1''>" + + "<job-tracker>{0}</job-tracker>" + + "<name-node>{1}</name-node>" + + "<master>local[*]</master>" + + "<mode>client</mode>" + + "<name>SparkRegression</name>" + + "<class>org.apache.spark.examples.mllib.JavaALS</class>" + + "<jar>" + getAppPath() +"/lib/test.jar</jar>" + + "<arg>" + getAppPath() + "/" + SPARK_FILENAME + "</arg>" + + "<arg>1</arg>" + + "<arg>2</arg>" + + "<arg>" + getAppPath() + "/" + OUTPUT + "</arg>" + + "<arg>2</arg>" + + "</spark>"; + return MessageFormat.format(script, getJobTrackerUri(), getNameNodeUri()); + } + + + public void testSparkAction() throws Exception { + FileSystem fs = getFileSystem(); + Path file = new Path(getAppPath(), SPARK_FILENAME); + Writer scriptWriter = new OutputStreamWriter(fs.create(file)); + scriptWriter.write("1,2,3"); + scriptWriter.write("\n"); + scriptWriter.write("2,3,4"); + scriptWriter.close(); + + Context context = createContext(getActionXml()); + final RunningJob launcherJob = submitAction(context); + waitFor(200 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return launcherJob.isComplete(); + } + }); + assertTrue(launcherJob.isSuccessful()); + + SparkActionExecutor ae = new SparkActionExecutor(); + ae.check(context, context.getAction()); + assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertTrue(fs.exists(new Path(getAppPath() + "/" + OUTPUT + "/userFeatures"))); + assertTrue(fs.exists(new Path(getAppPath() + "/" + OUTPUT + "/productFeatures"))); + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); + + } + + protected Context createContext(String actionXml) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + + File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class); + InputStream is = new FileInputStream(jarFile); + OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar")); + IOUtils.copyStream(is, os); + + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + SharelibUtils.addToDistributedCache("spark", getFileSystem(), getFsTestCaseDir(), protoConf); + + WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action"); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + action.setConf(actionXml); + + return new Context(wf, action); + } + + protected RunningJob submitAction(Context context) throws Exception { + SparkActionExecutor ae = new SparkActionExecutor(); + + WorkflowAction action = context.getAction(); + + ae.prepareActionDir(getFileSystem(), context); + ae.submitLauncher(getFileSystem(), context, action); + + String jobId = action.getExternalId(); + String jobTracker = action.getTrackerUri(); + String consoleUrl = action.getConsoleUrl(); + assertNotNull(jobId); + assertNotNull(jobTracker); + assertNotNull(consoleUrl); + + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); + jobConf.set("mapred.job.tracker", jobTracker); + + JobClient jobClient = + Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); + final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); + assertNotNull(runningJob); + return runningJob; + } + + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java new file mode 100644 index 0000000..1d41c6d --- /dev/null +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; + +public class TestSparkMain extends MainTestCase { + + private static final String INPUT = "input.txt"; + private static final String OUTPUT = "output"; + + @Override + public Void call() throws Exception { + XConfiguration jobConf = new XConfiguration(); + XConfiguration.copy(createJobConf(), jobConf); + + FileSystem fs = getFileSystem(); + Path file = new Path(getFsTestCaseDir(), "input.txt"); + Writer scriptWriter = new OutputStreamWriter(fs.create(file)); + scriptWriter.write("1,2,3"); + scriptWriter.write("\n"); + scriptWriter.write("2,3,4"); + scriptWriter.close(); + + jobConf.set(JavaMain.JAVA_MAIN_CLASS, "org.apache.spark.deploy.SparkSubmit"); + + jobConf.set("mapreduce.job.tags", "" + System.currentTimeMillis()); + setSystemProperty("oozie.job.launch.time", "" + System.currentTimeMillis()); + + jobConf.set(SparkActionExecutor.SPARK_MASTER, "local[*]"); + jobConf.set(SparkActionExecutor.SPARK_MODE, "client"); + jobConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.spark.examples.mllib.JavaALS"); + jobConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark ALS"); + jobConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1024M"); + jobConf.set(SparkActionExecutor.SPARK_JAR, getFsTestCaseDir()+"/lib/test.jar"); + + + File actionXml = new File(getTestCaseDir(), "action.xml"); + OutputStream os = new FileOutputStream(actionXml); + jobConf.writeXml(os); + os.close(); + + System.setProperty("oozie.action.conf.xml", actionXml.getAbsolutePath()); + + File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class); + InputStream is = new FileInputStream(jarFile); + os = getFileSystem().create(new Path(getFsTestCaseDir(), "lib/test.jar")); + IOUtils.copyStream(is, os); + + String input = getFsTestCaseDir() + "/" + INPUT; + String output = getFsTestCaseDir() + "/" + OUTPUT; + String[] args = {input, "1", "2", output, "2"}; + SparkMain.main(args); + assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT + "/userFeatures"))); + assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT + "/productFeatures"))); + return null; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/src/main/assemblies/sharelib.xml ---------------------------------------------------------------------- diff --git a/src/main/assemblies/sharelib.xml b/src/main/assemblies/sharelib.xml index 4a46b90..ea95c2e 100644 --- a/src/main/assemblies/sharelib.xml +++ b/src/main/assemblies/sharelib.xml @@ -63,6 +63,10 @@ <directory>${basedir}/hive2/target/partial-sharelib</directory> <outputDirectory>/</outputDirectory> </fileSet> + <fileSet> + <directory>${basedir}/spark/target/partial-sharelib</directory> + <outputDirectory>/</outputDirectory> + </fileSet> </fileSets> </assembly> http://git-wip-us.apache.org/repos/asf/oozie/blob/8814ac44/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index 35776c5..a410b10 100644 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -129,6 +129,12 @@ </dependency> <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-sharelib-spark</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.3</version> @@ -195,7 +201,8 @@ <configuration> <includeArtifactIds> oozie-sharelib-pig,oozie-sharelib-hive,oozie-sharelib-sqoop,oozie-sharelib-streaming, - oozie-sharelib-oozie,oozie-sharelib-hcatalog,oozie-sharelib-distcp,oozie-sharelib-hive2 + oozie-sharelib-oozie,oozie-sharelib-hcatalog,oozie-sharelib-distcp,oozie-sharelib-hive2, + oozie-sharelib-spark </includeArtifactIds> <excludeTransitive>true</excludeTransitive> <outputDirectory>${project.build.directory}/oozie-webapp-${project.version}/WEB-INF/lib</outputDirectory>
