Author: starchmd
Date: Tue Jan  6 16:38:01 2015
New Revision: 1649869

URL: http://svn.apache.org/r1649869
Log:
OODT-780: Spark backend to resource manager

Added:
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
    oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml
    oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml
    oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml
    
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml
    oodt/trunk/resource/src/main/scala/
    oodt/trunk/resource/src/main/scala/org/
    oodt/trunk/resource/src/main/scala/org/apache/
    oodt/trunk/resource/src/main/scala/org/apache/oodt/
    oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/
    oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/
    oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/
    
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
Modified:
    oodt/trunk/CHANGES.txt
    oodt/trunk/core/pom.xml
    oodt/trunk/resource/pom.xml
    oodt/trunk/resource/src/main/resources/resource.properties

Modified: oodt/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Tue Jan  6 16:38:01 2015
@@ -2,6 +2,9 @@ Apache OODT Change Log
 ======================
 Release 0.9 - Current Development
 -------------------------------------------
+
+* OODT-780 Spark backend to resource manager
+
 * OODT-802 Create Dockerfile for OODT Radix.
 
 * OODT-761 Update PGE version in Radix.
@@ -12,7 +15,6 @@ Release 0.9 - Current Development
 
 * OODT-770 Fix the RADIX Issues with a patch from Lewis McGibbney
 
-  
 Release 0.8 - 12/19/2014
 --------------------------------------------
 

Modified: oodt/trunk/core/pom.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/core/pom.xml?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/core/pom.xml (original)
+++ oodt/trunk/core/pom.xml Tue Jan  6 16:38:01 2015
@@ -321,11 +321,42 @@ the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                    <execution>
+                        <id>test-compile</id>
+                            <goals>
+                                <goal>testCompile</goal>
+                            </goals>
+                        <phase>test-compile</phase>
+                    </execution>
+                    <execution>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
     <dependencyManagement>
         <dependencies>
             <dependency>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.0</version>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.tika</groupId>
                 <artifactId>tika-core</artifactId>
                 <version>1.6</version>

Modified: oodt/trunk/resource/pom.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/pom.xml?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/resource/pom.xml (original)
+++ oodt/trunk/resource/pom.xml Tue Jan  6 16:38:01 2015
@@ -81,9 +81,24 @@ the License.
   </build>
   <dependencies>
     <dependency>
-      <groupId>org.apache.oodt</groupId>
-      <artifactId>oodt-commons</artifactId>
-      <version>${project.parent.version}</version>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.10.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.10</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_2.10</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.4.0</version>
     </dependency>   
     <dependency>
       <groupId>org.apache.oodt</groupId>

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/NoSparkFilePalindromeExample.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.examples;
+
+//JDK imports
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+//OODT imports
+import org.apache.oodt.cas.resource.metadata.JobMetadata;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A job that searches the supplied file for palindromes.  Outputs timing 
information
+ * to another file for benchmarking purposes. References standard palindrom 
calculation.
+ *
+ * Uses non-spark processing for the computations.
+ * </p>
+ */
+public class NoSparkFilePalindromeExample implements JobInstance, JobMetadata {
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
org.apache.oodt.cas.resource.structs.JobInstance#execute(org.apache.oodt.cas.resource.structs.JobInput)
+     */
+    public boolean execute(JobInput in) {
+        NameValueJobInput input = (NameValueJobInput) in;
+        PrintStream output = null;
+        BufferedReader br = null;
+        try {
+            //Setup output and timing
+            output = PalindromeUtils.getPrintStream(input.getValue("output"));
+            long count = 0;
+            final long start = System.currentTimeMillis();
+            //Read file and process
+            String file = input.getValue("file");
+            br = new BufferedReader(new FileReader(file));
+            String line;
+            while ((line = br.readLine()) != null) {
+                if (PalindromeUtils.isPalindrome(line))
+                    count++;
+            }
+            //Output timing and results
+            final long end = System.currentTimeMillis();
+            double timing = ((double)(end - start))/1000.0;
+            output.println("Found "+ count+" palindromes in "+timing+" 
seconds.");
+            br.close();
+        } catch (FileNotFoundException e) {
+            return false;
+        } catch (IOException e) {
+            return false;
+        } finally {
+            try {
+                br.close();
+                output.close();
+            } catch (Exception e) {}
+
+        }
+        return true;
+    }
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/PalindromeUtils.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.examples;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * @author starchmd
+ *
+ * Tests if a line is a palindrome.
+ *
+ */
+public class PalindromeUtils {
+    /**
+     * Super simple palindrome test.
+     * @param line - line to test
+     * @return true if it is a palindrome, false otherwise.
+     */
+    public static boolean isPalindrome(String line) {
+        line = line.replaceAll("\\s","").toLowerCase();
+        return line.equals(new StringBuilder(line).reverse().toString());
+    }
+    /**
+     * Get a PrintStream for printing to give file
+     * @param file - file to open as PrintStream
+     * @return stream for file to print to.
+     * @throws FileNotFoundException
+     */
+    public static PrintStream getPrintStream(String file) throws 
FileNotFoundException {
+        return new PrintStream(new File(file));
+    }
+    /**
+     * Functor class for spark.  Really should do this in Scala....
+     * Note: serial id is disabled because this class has no instance 
variables.
+     * @author starchmd
+     */
+    @SuppressWarnings("serial")
+    static class FilterPalindrome implements Function<String, Boolean> {
+        public Boolean call(String s) { return isPalindrome(s); }
+      }
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/SparkFilePalindromeExample.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.examples;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+//OODT imports
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+import org.apache.oodt.cas.resource.structs.SparkInstance;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A job that searches the supplied file for palindromes.  Outputs timing 
information
+ * to another file for benchmarking purposes. References standard palindrom 
calculation.
+ *
+ * Uses spark processing for the computations.
+ * </p>
+ */
+public class SparkFilePalindromeExample implements SparkInstance {
+
+    JavaSparkContext sc;
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
org.apache.oodt.cas.resource.structs.JobInstance#execute(org.apache.oodt.cas.resource.structs.JobInput)
+     */
+    public boolean execute(JobInput in) throws JobInputException {
+        NameValueJobInput input = (NameValueJobInput) in;
+        PrintStream output = null;
+        try {
+            //Setup output and timing
+            output = PalindromeUtils.getPrintStream(input.getValue("output"));
+            final long start = System.currentTimeMillis();
+            //Read file and process
+            JavaRDD<String> rdd = sc.textFile( input.getValue("file"));
+            JavaRDD<String> filtered = rdd.filter(new 
PalindromeUtils.FilterPalindrome());
+            long count = filtered.count();
+            //Output timing and results
+            final long end = System.currentTimeMillis();
+            double timing = ((double)(end - start))/1000.0;
+            output.println("Found "+ count+" palindromes in "+timing+" 
seconds.");
+        } catch (FileNotFoundException e) {
+            return false;
+        } finally {
+            try {
+                output.close();
+            } catch (Exception e) {}
+
+        }
+        return true;
+    }
+
+    @Override
+    public void setSparkContext(SparkContext context) {
+        this.sc = new JavaSparkContext(context);
+    }
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.examples;
+
+//JDK imports
+
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+//OODT imports
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+import org.apache.oodt.cas.resource.structs.StreamingInstance;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ *
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A job that searches the supplied file for palindromes.  Outputs timing 
information
+ * to another file for benchmarking purposes. References standard palindrom 
calculation.
+ *
+ * Uses spark processing for the computations.
+ * </p>
+ */
+public class StreamingPalindromeExample implements StreamingInstance {
+
+    JavaSparkContext sc;
+    JavaStreamingContext ssc;
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
org.apache.oodt.cas.resource.structs.JobInstance#execute(org.apache.oodt.cas.resource.structs.JobInput)
+     */
+    //Will not serialize a class that has no members
+    @SuppressWarnings("serial")
+    public boolean execute(JobInput in) throws JobInputException {
+        NameValueJobInput input = (NameValueJobInput) in;
+        //Get time for watchdog
+        final int time = Integer.parseInt(input.getValue("time"));
+        try {
+            //Output to put data into
+            final PrintStream output = 
PalindromeUtils.getPrintStream(input.getValue("output"));
+            //Filter stream and count
+            JavaReceiverInputDStream<String> stream = 
ssc.socketTextStream(input.getValue("host"),Integer.parseInt(input.getValue("port")));
+            JavaDStream<String> filtered = stream.filter(new 
PalindromeUtils.FilterPalindrome());
+            final JavaDStream<Long> count = filtered.count();
+            //For each packet-ized count: output
+            count.foreachRDD(new Function<JavaRDD<Long>,Void>(){
+                @Override
+                public Void call(JavaRDD<Long> jrdd) throws Exception {
+                    synchronized(output)
+                    {
+                        Long[] collected = (Long[])jrdd.rdd().collect();
+                        for (Long item : collected)
+                        output.println("Found "+item.longValue()+ " 
palindromes.");
+                    }
+                    return null;
+                }});
+            ssc.start();
+            //Stop in <time> seconds
+            new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        Thread.sleep(time);
+                    } catch (InterruptedException e) {
+                        //Don't cast this exception into the void
+                        Thread.currentThread().interrupt();
+                    } finally {
+                        ssc.stop();
+                        output.println("Stopping after "+time/1000+" 
seconds.");
+                    }
+                }
+            }).start();
+            //Wait for streaming to terminate
+            ssc.awaitTermination();
+        } catch (FileNotFoundException e) {
+            return false;
+        }
+        return true;
+    }
+    @Override
+    public void setStreamingContext(StreamingContext context) {
+        this.ssc = new JavaStreamingContext(context);
+    }
+    @Override
+    public void setSparkContext(SparkContext context) {
+        this.sc = new JavaSparkContext(context);
+    }
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkScheduler.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,150 @@
+package org.apache.oodt.cas.resource.scheduler;
+
+import java.net.URL;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.SparkInstance;
+import org.apache.oodt.cas.resource.structs.StreamingInstance;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.StreamingContext;
+
+/**
+ * A scheduler that runs spark jobs on a spark cluster.
+ *
+ * @author starchmd
+ *
+ */
+public class SparkScheduler implements Scheduler {
+
+    SparkContext sc;
+    StreamingContext ssc;
+    JobQueue queue;
+
+    private static final Logger LOG = 
Logger.getLogger(SparkScheduler.class.getName());
+
+    public SparkScheduler(JobQueue queue) {
+        SparkConf conf = new SparkConf();
+        
conf.setMaster(System.getProperty("resource.runner.spark.host","local"));
+        conf.setAppName("OODT Spark Job");
+
+        URL location = 
SparkScheduler.class.getResource('/'+SparkScheduler.class.getName().replace('.',
 '/')+".class");
+        conf.setJars(new String[]{"../lib/cas-resource-0.8-SNAPSHOT.jar"});
+        sc = new SparkContext(conf);
+        ssc = new StreamingContext(sc,new Duration(10000));
+        this.queue = queue;
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                if (queue.isEmpty())
+                    continue;
+                JobSpec spec = queue.getNextJob();
+
+                Class<?> clazz = 
Class.forName(spec.getJob().getJobInstanceClassName());
+                if (!(clazz.newInstance() instanceof SparkInstance)) {
+                    LOG.log(Level.WARNING,"Non-Spark job found 
("+spec.getJob().getId()+") ignoring.");
+                    continue;
+                }
+                this.schedule(spec);
+            } catch(SchedulerException e) {
+                LOG.log(Level.WARNING,"Scheduler exception detected: 
"+e.getMessage());
+            } catch (JobQueueException e) {
+                LOG.log(Level.WARNING,"Could not get next job from 
job-queue.");
+            } catch (ClassNotFoundException e) {
+                LOG.log(Level.WARNING,"Class not found: "+e.getMessage());
+            } catch (InstantiationException e) {
+                LOG.log(Level.WARNING,"Could not instantiate: 
"+e.getMessage());
+            } catch (IllegalAccessException e) {
+                LOG.log(Level.WARNING,"Could not access: "+e.getMessage());
+            }
+
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.scheduler.Scheduler#schedule(org.apache.oodt.cas.resource.structs.JobSpec)
+     */
+    @Override
+    public boolean schedule(JobSpec spec) throws SchedulerException {
+        try {
+            JobInstance instance = 
GenericResourceManagerObjectFactory.getJobInstanceFromClassName(spec.getJob().getJobInstanceClassName());
+            //spec.getIn().
+            SparkInstance sparkInstance = (SparkInstance) instance;
+            LOG.log(Level.INFO,"Setting SparkContext");
+            sparkInstance.setSparkContext(this.sc);
+            //Handle spark streaming
+            if (sparkInstance instanceof StreamingInstance) {
+                LOG.log(Level.INFO,"Found streaming instance, setting 
StreamingContext");
+                
((StreamingInstance)sparkInstance).setStreamingContext(this.ssc);
+            }
+            sparkInstance.execute(spec.getIn());
+
+        } catch (JobInputException e) {
+            LOG.log(Level.WARNING,"Job input exception detected.");
+            throw new SchedulerException(e);
+        }
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(org.apache.oodt.cas.resource.structs.JobSpec)
+     */
+    @Override
+    public ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getMonitor()
+     */
+    @Override
+    public Monitor getMonitor() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getBatchmgr()
+     */
+    @Override
+    public Batchmgr getBatchmgr() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getJobQueue()
+     */
+    @Override
+    public JobQueue getJobQueue() {
+        return queue;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oodt.cas.resource.scheduler.Scheduler#getQueueManager()
+     */
+    @Override
+    public QueueManager getQueueManager() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/SparkSchedulerFactory.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.scheduler;
+
+//JAVA imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+//OODT imports
+
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A factory for the SparkScheduler
+ * </p>
+ *
+ */
+public class SparkSchedulerFactory implements SchedulerFactory {
+
+       private static final Logger LOG = 
Logger.getLogger(SparkSchedulerFactory.class.getName());
+
+
+       private JobQueue queue = null;
+       /**
+        * Setup factory
+        */
+       public SparkSchedulerFactory() {
+               String jobQueueClassStr = 
System.getProperty("resource.jobqueue.factory",
+                       JobStackJobQueueFactory.class.getName());
+               LOG.log(Level.INFO,"Using job-queue: "+jobQueueClassStr+ " 
with: "+SparkScheduler.class.getName());
+               queue = 
GenericResourceManagerObjectFactory.getJobQueueServiceFromFactory(jobQueueClassStr);
+       }
+       /**
+        * Returns scheduler
+        */
+       public Scheduler createScheduler() {
+               return new SparkScheduler(queue);
+       }
+}
\ No newline at end of file

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/SparkInstance.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,13 @@
+package org.apache.oodt.cas.resource.structs;
+
+import org.apache.spark.SparkContext;
+
+public interface SparkInstance extends JobInstance {
+
+    /**
+     * Set the context to run by.
+     * @param context
+     */
+    public void setSparkContext(SparkContext context);
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/StreamingInstance.java
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,13 @@
+package org.apache.oodt.cas.resource.structs;
+
+import org.apache.spark.streaming.StreamingContext;
+
+public interface StreamingInstance extends SparkInstance {
+
+    /**
+     * Set the context to run by.
+     * @param context
+     */
+    public void setStreamingContext(StreamingContext context);
+
+}

Added: oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml?rev=1649869&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml 
(added)
+++ oodt/trunk/resource/src/main/resources/examples/jobs/exPalindrome.xml Tue 
Jan  6 16:38:01 2015
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas"; id="no-spark-1"
+       name="No Spark Job">
+       <instanceClass
+               
name="org.apache.oodt.cas.resource.spark.examples.NoSparkFilePalindromeExample" 
/>
+       <inputClass
+               name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+               <properties>
+                       <property name="file" value="<input-file>" />
+                       <property name="output" value="<output-file>" />
+               </properties>
+       </inputClass>
+       <queue>quick</queue>
+       <load>1</load>
+</cas:job>

Added: oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml?rev=1649869&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml (added)
+++ oodt/trunk/resource/src/main/resources/examples/jobs/exSparkJob.xml Tue Jan 
 6 16:38:01 2015
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas"; id="scala-12345"
+       name="TestSparkJob">
+       <instanceClass
+               name="org.apache.oodt.cas.resource.examples.ScalaHelloWorld" />
+       <inputClass
+               name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+               <properties>
+                       <property name="file" 
value="/Users/mstarch/deploy/test.file" />
+               </properties>
+       </inputClass>
+       <queue>quick</queue>
+       <load>1</load>
+</cas:job>

Added: 
oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml?rev=1649869&view=auto
==============================================================================
--- oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml 
(added)
+++ oodt/trunk/resource/src/main/resources/examples/jobs/exSparkPalindrome.xml 
Tue Jan  6 16:38:01 2015
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas"; id="spark-1"
+       name="Spark Job">
+       <instanceClass
+               
name="org.apache.oodt.cas.resource.spark.examples.SparkFilePalindromeExample" />
+       <inputClass
+               name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+               <properties>
+                       <property name="file" 
value="hdfs://<host>:<port>/<input-file>" />
+                       <property name="output" value="<output-file>" />
+               </properties>
+       </inputClass>
+       <queue>quick</queue>
+       <load>1</load>
+</cas:job>

Added: 
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml 
(added)
+++ 
oodt/trunk/resource/src/main/resources/examples/jobs/exStreamingPalindrome.xml 
Tue Jan  6 16:38:01 2015
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<cas:job xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas"; id="streaming-1"
+       name="Streaming Job">
+       <instanceClass
+               
name="org.apache.oodt.cas.resource.spark.examples.StreamingPalindromeExample" />
+       <inputClass
+               name="org.apache.oodt.cas.resource.structs.NameValueJobInput">
+               <properties>
+                       <property name="host" value="<host>" />
+                       <property name="port" value="<port>" />
+                       <property name="output" value="<output-file>" />
+               </properties>
+       </inputClass>
+       <queue>quick</queue>
+       <load>1</load>
+</cas:job>

Modified: oodt/trunk/resource/src/main/resources/resource.properties
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/resource.properties?rev=1649869&r1=1649868&r2=1649869&view=diff
==============================================================================
--- oodt/trunk/resource/src/main/resources/resource.properties (original)
+++ oodt/trunk/resource/src/main/resources/resource.properties Tue Jan  6 
16:38:01 2015
@@ -16,6 +16,9 @@
 #
 # Properties required to configure the Resource Manager
 
+# resource spark master
+resource.runner.spark.host = mesos://<ip>:5050
+
 # resource batchmgr factory
 resource.batchmgr.factory = 
org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchMgrFactory
 

Added: 
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala?rev=1649869&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
 (added)
+++ 
oodt/trunk/resource/src/main/scala/org/apache/oodt/cas/resource/examples/ScalaHelloWorld.scala
 Tue Jan  6 16:38:01 2015
@@ -0,0 +1,32 @@
+package org.apache.oodt.cas.resource.examples;
+
+import org.apache.oodt.cas.resource.structs.SparkInstance;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+import org.apache.spark.SparkContext;
+/**
+* @author starchmd
+*
+* An example class for use with the Spark backend to the resource manager.
+*/
+class ScalaHelloWorld extends SparkInstance {
+    var sc : SparkContext = null;
+    /**
+     * Execute this job.
+     */
+    def execute(input: JobInput) : Boolean = {
+        val name = input.asInstanceOf[NameValueJobInput].getValue("file");
+        val textFile = sc.textFile(name);
+        val linesWithSpark = textFile.filter(line => 
line.contains("fantastic"));
+        var ln = linesWithSpark.count();
+        println("Line count: "+ln);
+        textFile.foreach(line => println(line));
+        return true;
+    }
+    /**
+     * Set the spark context.
+     */
+    def setSparkContext(context: SparkContext) {
+        this.sc = context;
+    }
+}


Reply via email to