Author: starchmd
Date: Tue Jan 6 16:38:01 2015
New Revision: 1649869
URL: http://svn.apache.org/r1649869
Log:
OODT-780: Spark backend to resource manager
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml
oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml
oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml
oodt/trunk/resource/src/main/scala/
oodt/trunk/resource/src/main/scala/org/
oodt/trunk/resource/src/main/scala/org/apache/
oodt/trunk/resource/src/main/scala/org/apache/oodt/
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
Modified:
oodt/trunk/CHANGES.txt
oodt/trunk/core/pom.xml
oodt/trunk/resource/pom.xml
oodt/trunk/resource/src/main/resources/resource.properties
Modified: oodt/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Tue Jan 6 16:38:01 2015
@@ -2,6 +2,9 @@ Apache OODT Change Log
======================
Release 0.9 - Current Development
-------------------------------------------
+
+* OODT-780 Spark backend to resource manager
+
* OODT-802 Create Dockerfile for OODT Radix.
* OODT-761 Update PGE version in Radix.
@@ -12,7 +15,6 @@ Release 0.9 - Current Development
* OODT-770 Fix the RADIX Issues with a patch from Lewis McGibbney
-
Release 0.8 - 12/19/2014
--------------------------------------------
Modified: oodt/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/core/pom.xml?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/core/pom.xml (original)
+++ oodt/trunk/core/pom.xml Tue Jan 6 16:38:01 2015
@@ -321,11 +321,42 @@ the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
<version>1.6</version>
Modified: oodt/trunk/resource/pom.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/pom.xml?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/resource/pom.xml (original)
+++ oodt/trunk/resource/pom.xml Tue Jan 6 16:38:01 2015
@@ -81,9 +81,24 @@ the License.
</build>
<dependencies>
<dependency>
- <groupId>org.apache.oodt</groupId>
- <artifactId>oodt-commons</artifactId>
- <version>${project.parent.version}</version>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.10.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.oodt</groupId>
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.examples;
+
+//JDK imports
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+//OODT imports
+import org.apache.oodt.cas.resource.metadata.JobMetadata;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A job that searches the supplied file for palindromes. Outputs timing
information
+ * to another file for benchmarking purposes. References standard palindrom
calculation.
+ *
+ * Uses non-spark processing for the computations.
+ * </p>
+ */
+public class NoSparkFilePalindromeExample implements JobInstance, JobMetadata {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
org.apache.oodt.cas.resource.structs.JobInstance#execute(org.apache.oodt.cas.resource.structs.JobInput)
+ */
+ public boolean execute(JobInput in) {
+ NameValueJobInput input = (NameValueJobInput) in;
+ PrintStream output = null;
+ BufferedReader br = null;
+ try {
+ //Setup output and timing
+ output = PalindromeUtils.getPrintStream(input.getValue("output"));
+ long count = 0;
+ final long start = System.currentTimeMillis();
+ //Read file and process
+ String file = input.getValue("file");
+ br = new BufferedReader(new FileReader(file));
+ String line;
+ while ((line = br.readLine()) != null) {
+ if (PalindromeUtils.isPalindrome(line))
+ count++;
+ }
+ //Output timing and results
+ final long end = System.currentTimeMillis();
+ double timing = ((double)(end - start))/1000.0;
+ output.println("Found "+ count+" palindromes in "+timing+"
seconds.");
+ br.close();
+ } catch (FileNotFoundException e) {
+ return false;
+ } catch (IOException e) {
+ return false;
+ } finally {
+ try {
+ br.close();
+ output.close();
+ } catch (Exception e) {}
+
+ }
+ return true;
+ }
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.examples;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * @author starchmd
+ *
+ * Tests if a line is a palindrome.
+ *
+ */
+public class PalindromeUtils {
+ /**
+ * Super simple palindrome test.
+ * @param line - line to test
+ * @return true if it is a palindrome, false otherwise.
+ */
+ public static boolean isPalindrome(String line) {
+ line = line.replaceAll("\\s","").toLowerCase();
+ return line.equals(new StringBuilder(line).reverse().toString());
+ }
+ /**
+ * Get a PrintStream for printing to give file
+ * @param file - file to open as PrintStream
+ * @return stream for file to print to.
+ * @throws FileNotFoundException
+ */
+ public static PrintStream getPrintStream(String file) throws
FileNotFoundException {
+ return new PrintStream(new File(file));
+ }
+ /**
+ * Functor class for spark. Really should do this in Scala....
+ * Note: serial id is disabled because this class has no instance
variables.
+ * @author starchmd
+ */
+ @SuppressWarnings("serial")
+ static class FilterPalindrome implements Function<String, Boolean> {
+ public Boolean call(String s) { return isPalindrome(s); }
+ }
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.examples;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+//OODT imports
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+import org.apache.oodt.cas.resource.structs.SparkInstance;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A job that searches the supplied file for palindromes. Outputs timing
information
+ * to another file for benchmarking purposes. References standard palindrom
calculation.
+ *
+ * Uses spark processing for the computations.
+ * </p>
+ */
+public class SparkFilePalindromeExample implements SparkInstance {
+
+ JavaSparkContext sc;
+ /*
+ * (non-Javadoc)
+ *
+ * @see
org.apache.oodt.cas.resource.structs.JobInstance#execute(org.apache.oodt.cas.resource.structs.JobInput)
+ */
+ public boolean execute(JobInput in) throws JobInputException {
+ NameValueJobInput input = (NameValueJobInput) in;
+ PrintStream output = null;
+ try {
+ //Setup output and timing
+ output = PalindromeUtils.getPrintStream(input.getValue("output"));
+ final long start = System.currentTimeMillis();
+ //Read file and process
+ JavaRDD<String> rdd = sc.textFile( input.getValue("file"));
+ JavaRDD<String> filtered = rdd.filter(new
PalindromeUtils.FilterPalindrome());
+ long count = filtered.count();
+ //Output timing and results
+ final long end = System.currentTimeMillis();
+ double timing = ((double)(end - start))/1000.0;
+ output.println("Found "+ count+" palindromes in "+timing+"
seconds.");
+ } catch (FileNotFoundException e) {
+ return false;
+ } finally {
+ try {
+ output.close();
+ } catch (Exception e) {}
+
+ }
+ return true;
+ }
+
+ @Override
+ public void setSparkContext(SparkContext context) {
+ this.sc = new JavaSparkContext(context);
+ }
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.examples;
+
+//JDK imports
+
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+//OODT imports
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+import org.apache.oodt.cas.resource.structs.StreamingInstance;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A job that searches the supplied file for palindromes. Outputs timing
information
+ * to another file for benchmarking purposes. References standard palindrom
calculation.
+ *
+ * Uses spark processing for the computations.
+ * </p>
+ */
+public class StreamingPalindromeExample implements StreamingInstance {
+
+ JavaSparkContext sc;
+ JavaStreamingContext ssc;
+ /*
+ * (non-Javadoc)
+ *
+ * @see
org.apache.oodt.cas.resource.structs.JobInstance#execute(org.apache.oodt.cas.resource.structs.JobInput)
+ */
+ //Will not serialize a class that has no members
+ @SuppressWarnings("serial")
+ public boolean execute(JobInput in) throws JobInputException {
+ NameValueJobInput input = (NameValueJobInput) in;
+ //Get time for watchdog
+ final int time = Integer.parseInt(input.getValue("time"));
+ try {
+ //Output to put data into
+ final PrintStream output =
PalindromeUtils.getPrintStream(input.getValue("output"));
+ //Filter stream and count
+ JavaReceiverInputDStream<String> stream =
ssc.socketTextStream(input.getValue("host"),Integer.parseInt(input.getValue("port")));
+ JavaDStream<String> filtered = stream.filter(new
PalindromeUtils.FilterPalindrome());
+ final JavaDStream<Long> count = filtered.count();
+ //For each packet-ized count: output
+ count.foreachRDD(new Function<JavaRDD<Long>,Void>(){
+ @Override
+ public Void call(JavaRDD<Long> jrdd) throws Exception {
+ synchronized(output)
+ {
+ Long[] collected = (Long[])jrdd.rdd().collect();
+ for (Long item : collected)
+ output.println("Found "+item.longValue()+ "
palindromes.");
+ }
+ return null;
+ }});
+ ssc.start();
+ //Stop in <time> seconds
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(time);
+ } catch (InterruptedException e) {
+ //Don't cast this exception into the void
+ Thread.currentThread().interrupt();
+ } finally {
+ ssc.stop();
+ output.println("Stopping after "+time/1000+"
seconds.");
+ }
+ }
+ }).start();
+ //Wait for streaming to terminate
+ ssc.awaitTermination();
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ return true;
+ }
+ @Override
+ public void setStreamingContext(StreamingContext context) {
+ this.ssc = new JavaStreamingContext(context);
+ }
+ @Override
+ public void setSparkContext(SparkContext context) {
+ this.sc = new JavaSparkContext(context);
+ }
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,150 @@
+package org.apache.oodt.cas.resource.scheduler;
+
+import java.net.URL;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.SparkInstance;
+import org.apache.oodt.cas.resource.structs.StreamingInstance;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.StreamingContext;
+
+/**
+ * A scheduler that runs spark jobs on a spark cluster.
+ *
+ * @author starchmd
+ *
+ */
+public class SparkScheduler implements Scheduler {
+
+ SparkContext sc;
+ StreamingContext ssc;
+ JobQueue queue;
+
+ private static final Logger LOG =
Logger.getLogger(SparkScheduler.class.getName());
+
+ public SparkScheduler(JobQueue queue) {
+ SparkConf conf = new SparkConf();
+
conf.setMaster(System.getProperty("resource.runner.spark.host","local"));
+ conf.setAppName("OODT Spark Job");
+
+ URL location =
SparkScheduler.class.getResource('/'+SparkScheduler.class.getName().replace('.',
'/')+".class");
+ conf.setJars(new String[]{"../lib/cas-resource-0.8-SNAPSHOT.jar"});
+ sc = new SparkContext(conf);
+ ssc = new StreamingContext(sc,new Duration(10000));
+ this.queue = queue;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ if (queue.isEmpty())
+ continue;
+ JobSpec spec = queue.getNextJob();
+
+ Class<?> clazz =
Class.forName(spec.getJob().getJobInstanceClassName());
+ if (!(clazz.newInstance() instanceof SparkInstance)) {
+ LOG.log(Level.WARNING,"Non-Spark job found
("+spec.getJob().getId()+") ignoring.");
+ continue;
+ }
+ this.schedule(spec);
+ } catch(SchedulerException e) {
+ LOG.log(Level.WARNING,"Scheduler exception detected:
"+e.getMessage());
+ } catch (JobQueueException e) {
+ LOG.log(Level.WARNING,"Could not get next job from
job-queue.");
+ } catch (ClassNotFoundException e) {
+ LOG.log(Level.WARNING,"Class not found: "+e.getMessage());
+ } catch (InstantiationException e) {
+ LOG.log(Level.WARNING,"Could not instantiate:
"+e.getMessage());
+ } catch (IllegalAccessException e) {
+ LOG.log(Level.WARNING,"Could not access: "+e.getMessage());
+ }
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.scheduler.Scheduler#schedule(org.apache.oodt.cas.resource.structs.JobSpec)
+ */
+ @Override
+ public boolean schedule(JobSpec spec) throws SchedulerException {
+ try {
+ JobInstance instance =
GenericResourceManagerObjectFactory.getJobInstanceFromClassName(spec.getJob().getJobInstanceClassName());
+ //spec.getIn().
+ SparkInstance sparkInstance = (SparkInstance) instance;
+ LOG.log(Level.INFO,"Setting SparkContext");
+ sparkInstance.setSparkContext(this.sc);
+ //Handle spark streaming
+ if (sparkInstance instanceof StreamingInstance) {
+ LOG.log(Level.INFO,"Found streaming instance, setting
StreamingContext");
+
((StreamingInstance)sparkInstance).setStreamingContext(this.ssc);
+ }
+ sparkInstance.execute(spec.getIn());
+
+ } catch (JobInputException e) {
+ LOG.log(Level.WARNING,"Job input exception detected.");
+ throw new SchedulerException(e);
+ }
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(org.apache.oodt.cas.resource.structs.JobSpec)
+ */
+ @Override
+ public ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getMonitor()
+ */
+ @Override
+ public Monitor getMonitor() {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getBatchmgr()
+ */
+ @Override
+ public Batchmgr getBatchmgr() {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getJobQueue()
+ */
+ @Override
+ public JobQueue getJobQueue() {
+ return queue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getQueueManager()
+ */
+ @Override
+ public QueueManager getQueueManager() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.scheduler;
+
+//JAVA imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+//OODT imports
+
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A factory for the SparkScheduler
+ * </p>
+ *
+ */
+public class SparkSchedulerFactory implements SchedulerFactory {
+
+ private static final Logger LOG =
Logger.getLogger(SparkSchedulerFactory.class.getName());
+
+
+ private JobQueue queue = null;
+ /**
+ * Setup factory
+ */
+ public SparkSchedulerFactory() {
+ String jobQueueClassStr =
System.getProperty("resource.jobqueue.factory",
+ JobStackJobQueueFactory.class.getName());
+ LOG.log(Level.INFO,"Using job-queue: "+jobQueueClassStr+ "
with: "+SparkScheduler.class.getName());
+ queue =
GenericResourceManagerObjectFactory.getJobQueueServiceFromFactory(jobQueueClassStr);
+ }
+ /**
+ * Returns scheduler
+ */
+ public Scheduler createScheduler() {
+ return new SparkScheduler(queue);
+ }
+}
\ No newline at end of file
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,13 @@
+package org.apache.oodt.cas.resource.structs;
+
+import org.apache.spark.SparkContext;
+
+public interface SparkInstance extends JobInstance {
+
+ /**
+ * Set the context to run by.
+ * @param context
+ */
+ public void setSparkContext(SparkContext context);
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,13 @@
+package org.apache.oodt.cas.resource.structs;
+
+import org.apache.spark.streaming.StreamingContext;
+
+public interface StreamingInstance extends SparkInstance {
+
+ /**
+ * Set the context to run by.
+ * @param context
+ */
+ public void setStreamingContext(StreamingContext context);
+
+}
Added: oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml?rev=1649869&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml
(added)
+++ oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml Tue
Jan 6 16:38:01 2015
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas" id="no-spark-1"
+ name="No Spark Job">
+ <instanceClass
+
name="org.apache.oodt.cas.resource.spark.examples.NoSparkFilePalindromeExample"
/>
+ <inputClass
+ name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+ <properties>
+ <property name="file" value="<input-file>" />
+ <property name="output" value="<output-file>" />
+ </properties>
+ </inputClass>
+ <queue>quick</queue>
+ <load>1</load>
+</cas:job>
Added: oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml?rev=1649869&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml (added)
+++ oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml Tue Jan
6 16:38:01 2015
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas" id="scala-12345"
+ name="TestSparkJob">
+ <instanceClass
+ name="org.apache.oodt.cas.resource.examples.ScalaHelloWorld" />
+ <inputClass
+ name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+ <properties>
+ <property name="file"
value="/Users/mstarch/deploy/test.file" />
+ </properties>
+ </inputClass>
+ <queue>quick</queue>
+ <load>1</load>
+</cas:job>
Added:
oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml?rev=1649869&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml
(added)
+++ oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas" id="spark-1"
+ name="Spark Job">
+ <instanceClass
+
name="org.apache.oodt.cas.resource.spark.examples.SparkFilePalindromeExample" />
+ <inputClass
+ name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+ <properties>
+ <property name="file"
value="hdfs://<host>:<port>/<input-file>" />
+ <property name="output" value="<output-file>" />
+ </properties>
+ </inputClass>
+ <queue>quick</queue>
+ <load>1</load>
+</cas:job>
Added:
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml
(added)
+++
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas" id="streaming-1"
+ name="Streaming Job">
+ <instanceClass
+
name="org.apache.oodt.cas.resource.spark.examples.StreamingPalindromeExample" />
+ <inputClass
+ name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+ <properties>
+ <property name="host" value="<host>" />
+ <property name="port" value="<port>" />
+ <property name="output" value="<output-file>" />
+ </properties>
+ </inputClass>
+ <queue>quick</queue>
+ <load>1</load>
+</cas:job>
Modified: oodt/trunk/resource/src/main/resources/resource.properties
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/resource.properties?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/resources/resource.properties (original)
+++ oodt/trunk/resource/src/main/resources/resource.properties Tue Jan 6
16:38:01 2015
@@ -16,6 +16,9 @@
#
# Properties required to configure the Resource Manager
+# resource spark master
+resource.runner.spark.host = mesos://<ip>:5050
+
# resource batchmgr factory
resource.batchmgr.factory =
org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchMgrFactory
Added:
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala?rev=1649869&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
(added)
+++
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
Tue Jan 6 16:38:01 2015
@@ -0,0 +1,32 @@
+package org.apache.oodt.cas.resource.examples;
+
+import org.apache.oodt.cas.resource.structs.SparkInstance;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+import org.apache.spark.SparkContext;
+/**
+* @author starchmd
+*
+* An example class for use with the Spark backend to the resource manager.
+*/
+class ScalaHelloWorld extends SparkInstance {
+ var sc : SparkContext = null;
+ /**
+ * Execute this job.
+ */
+ def execute(input: JobInput) : Boolean = {
+ val name = input.asInstanceOf[NameValueJobInput].getValue("file");
+ val textFile = sc.textFile(name);
+ val linesWithSpark = textFile.filter(line =>
line.contains("fantastic"));
+ var ln = linesWithSpark.count();
+ println("Line count: "+ln);
+ textFile.foreach(line => println(line));
+ return true;
+ }
+ /**
+ * Set the spark context.
+ */
+ def setSparkContext(context: SparkContext) {
+ this.sc = context;
+ }
+}