Repository: oozie Updated Branches: refs/heads/master ebb3ec6d0 -> 5228eb8fe
OOZIE-2071 Add a Spark example (pavan kumar via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5228eb8f Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5228eb8f Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5228eb8f Branch: refs/heads/master Commit: 5228eb8fef99146a066b9b609944f26a1b5a5828 Parents: ebb3ec6 Author: Robert Kanter <[email protected]> Authored: Fri Feb 20 14:43:29 2015 -0800 Committer: Robert Kanter <[email protected]> Committed: Fri Feb 20 14:43:29 2015 -0800 ---------------------------------------------------------------------- .../action/hadoop/SparkActionExecutor.java | 6 ++- examples/pom.xml | 51 +++++++++++++++----- examples/src/main/apps/spark/job.properties | 24 +++++++++ examples/src/main/apps/spark/workflow.xml | 42 ++++++++++++++++ .../org/apache/oozie/example/SparkFileCopy.java | 41 ++++++++++++++++ release-log.txt | 1 + sharelib/spark/pom.xml | 51 +++----------------- .../action/hadoop/TestSparkActionExecutor.java | 10 ++-- .../oozie/action/hadoop/TestSparkMain.java | 11 ++--- src/main/assemblies/examples.xml | 7 ++- 10 files changed, 173 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index 65f8d5a..732d5f0 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -32,7 +32,8 @@ import java.util.List; public class SparkActionExecutor extends JavaActionExecutor { public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain"; - public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; + public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2 + public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first"; // hadoop-1 public static final String SPARK_MASTER = "oozie.spark.master"; public static final String SPARK_MODE = "oozie.spark.mode"; public static final String SPARK_OPTS = "oozie.spark.spark-opts"; @@ -85,6 +86,9 @@ public class SparkActionExecutor extends JavaActionExecutor { if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) { launcherJobConf.set(TASK_USER_PRECEDENCE, "true"); } + if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) { + launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true"); + } return launcherJobConf; } http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index c5af129..442a520 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -109,21 +109,50 @@ <artifactId>hsqldb</artifactId> <scope>compile</scope> </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.mesos</groupId> + <artifactId>mesos</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> <plugins> <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <configuration> - <excludeSubProjects>false</excludeSubProjects> - <excludes> - <!-- excluding all as the root POM does the full check--> - <exclude>**</exclude> - </excludes> - </configuration> - </plugin> - <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludeSubProjects>false</excludeSubProjects> + <excludes> + <!-- excluding all as the root POM does the full check--> + <exclude>**</exclude> + </excludes> + </configuration> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/examples/src/main/apps/spark/job.properties ---------------------------------------------------------------------- diff --git a/examples/src/main/apps/spark/job.properties b/examples/src/main/apps/spark/job.properties new file mode 100644 index 0000000..4bf67df --- /dev/null +++ b/examples/src/main/apps/spark/job.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +nameNode=hdfs://localhost:8020 +jobTracker=localhost:8021 +queueName=default +examplesRoot=examples +oozie.use.system.libpath=true +oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/spark \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/examples/src/main/apps/spark/workflow.xml ---------------------------------------------------------------------- diff --git a/examples/src/main/apps/spark/workflow.xml b/examples/src/main/apps/spark/workflow.xml new file mode 100644 index 0000000..f5ac7f6 --- /dev/null +++ b/examples/src/main/apps/spark/workflow.xml @@ -0,0 +1,42 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'> + <start to='spark-node' /> + + <action name='spark-node'> + <spark xmlns="uri:oozie:spark-action:0.1"> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <master>local[*]</master> + <name>Spark-FileCopy</name> + <class>org.apache.oozie.example.SparkFileCopy</class> + <jar>${nameNode}/user/${wf:user()}/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar> + <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data/text/data.txt</arg> + <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output</arg> + </spark> + <ok to="end" /> + <error to="fail" /> + </action> + + <kill name="fail"> + <message>Workflow failed, error + message[${wf:errorMessage(wf:lastErrorNode())}] + </message> + </kill> + <end name='end' /> +</workflow-app> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/examples/src/main/java/org/apache/oozie/example/SparkFileCopy.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/oozie/example/SparkFileCopy.java b/examples/src/main/java/org/apache/oozie/example/SparkFileCopy.java new file mode 100644 index 0000000..d17534d --- /dev/null +++ b/examples/src/main/java/org/apache/oozie/example/SparkFileCopy.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.example; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public final class SparkFileCopy { + + public static void main(String[] args) throws Exception { + + if (args.length < 2) { + System.err.println("Usage: SparkFileCopy <file> <file>"); + System.exit(1); + } + + SparkConf sparkConf = new SparkConf().setAppName("SparkFileCopy"); + JavaSparkContext ctx = new JavaSparkContext(sparkConf); + JavaRDD<String> lines = ctx.textFile(args[0]); + lines.saveAsTextFile(args[1]); + System.out.println("Copied file from " + args[0] + " to " + args[1]); + ctx.stop(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index d82e578..be11f2c 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2071 Add a Spark example (pavan kumar via rkanter) OOZIE-2145 ZooKeeper paths should start with a "/" (rkanter) OOZIE-2113 Oozie Command Line Utilities are failing as hadoop-auth jar not found (shwethags) OOZIE-1688 New configuration to specify server-server authentication type (puru) http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml index db46b8c..c532532 100644 --- a/sharelib/spark/pom.xml +++ b/sharelib/spark/pom.xml @@ -61,10 +61,6 @@ <artifactId>mesos</artifactId> </exclusion> <exclusion> - <groupId>org.spark-project.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> @@ -92,40 +88,6 @@ <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-examples_2.10</artifactId> - <version>${spark.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_2.10</artifactId> - <version>${spark.version}</version> - <scope>compile</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_2.10</artifactId> <version>${spark.version}</version> <scope>compile</scope> @@ -133,13 +95,6 @@ <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - <version>${spark.version}</version> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> <scope>compile</scope> @@ -167,6 +122,12 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-examples</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java index b83be47..a15e76b 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -65,14 +65,11 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { "<name-node>{1}</name-node>" + "<master>local[*]</master>" + "<mode>client</mode>" + - "<name>SparkRegression</name>" + - "<class>org.apache.spark.examples.mllib.JavaALS</class>" + + "<name>SparkFileCopy</name>" + + "<class>org.apache.oozie.example.SparkFileCopy</class>" + "<jar>" + getAppPath() +"/lib/test.jar</jar>" + "<arg>" + getAppPath() + "/" + SPARK_FILENAME + "</arg>" + - "<arg>1</arg>" + - "<arg>2</arg>" + "<arg>" + getAppPath() + "/" + OUTPUT + "</arg>" + - "<arg>2</arg>" + "</spark>"; return MessageFormat.format(script, getJobTrackerUri(), getNameNodeUri()); } @@ -99,8 +96,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { SparkActionExecutor ae = new SparkActionExecutor(); ae.check(context, context.getAction()); assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); - assertTrue(fs.exists(new Path(getAppPath() + "/" + OUTPUT + "/userFeatures"))); - assertTrue(fs.exists(new Path(getAppPath() + "/" + OUTPUT + "/productFeatures"))); + assertTrue(fs.exists(new Path(getAppPath() + "/" + OUTPUT))); ae.end(context, context.getAction()); assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java index 1d41c6d..7707622 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java @@ -56,10 +56,10 @@ public class TestSparkMain extends MainTestCase { jobConf.set(SparkActionExecutor.SPARK_MASTER, "local[*]"); jobConf.set(SparkActionExecutor.SPARK_MODE, "client"); - jobConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.spark.examples.mllib.JavaALS"); - jobConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark ALS"); + jobConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + jobConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); jobConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1024M"); - jobConf.set(SparkActionExecutor.SPARK_JAR, getFsTestCaseDir()+"/lib/test.jar"); + jobConf.set(SparkActionExecutor.SPARK_JAR, getFsTestCaseDir() + "/lib/test.jar"); File actionXml = new File(getTestCaseDir(), "action.xml"); @@ -76,10 +76,9 @@ public class TestSparkMain extends MainTestCase { String input = getFsTestCaseDir() + "/" + INPUT; String output = getFsTestCaseDir() + "/" + OUTPUT; - String[] args = {input, "1", "2", output, "2"}; + String[] args = {input, output}; SparkMain.main(args); - assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT + "/userFeatures"))); - assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT + "/productFeatures"))); + assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT))); return null; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/5228eb8f/src/main/assemblies/examples.xml ---------------------------------------------------------------------- diff --git a/src/main/assemblies/examples.xml b/src/main/assemblies/examples.xml index 819545c..d70c280 100644 --- a/src/main/assemblies/examples.xml +++ b/src/main/assemblies/examples.xml @@ -114,7 +114,12 @@ <file> <source>${basedir}/target/${artifact.artifactId}-${artifact.version}.jar</source> <outputDirectory>/examples/apps/map-reduce/lib</outputDirectory> - </file> + </file> + <file> + <source>${basedir}/target/${artifact.artifactId}-${artifact.version}.jar</source> + <outputDirectory>/examples/apps/spark/lib</outputDirectory> + <destName>${artifact.artifactId}.jar</destName> + </file> </files> </assembly>
