Author: starchmd
Date: Tue Jan  6 17:39:22 2015
New Revision: 1649884

URL: http://svn.apache.org/r1649884
Log:
OODT-699: Mesos cluster manager backend to resource manager

Added:
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
    
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
    
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
Modified:
    oodt/trunk/CHANGES.txt
    oodt/trunk/resource/pom.xml

Modified: oodt/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1649884&r1=1649883&r2=1649884&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Tue Jan  6 17:39:22 2015
@@ -3,6 +3,8 @@ Apache OODT Change Log
 Release 0.9 - Current Development
 -------------------------------------------
 
+* OODT-699 Mesos cluster manager backend to resource manager
+
 * OODT-780 Spark backend to resource manager
 
 * OODT-802 Create Dockerfile for OODT Radix.

Modified: oodt/trunk/resource/pom.xml
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/pom.xml?rev=1649884&r1=1649883&r2=1649884&view=diff
==============================================================================
--- oodt/trunk/resource/pom.xml (original)
+++ oodt/trunk/resource/pom.xml Tue Jan  6 17:39:22 2015
@@ -137,7 +137,7 @@ the License.
     <dependency>
       <groupId>commons-collections</groupId>
       <artifactId>commons-collections</artifactId>
-      <version>2.1</version>
+      <version>3.2.1</version>
     </dependency>
     <dependency>
       <groupId>commons-pool</groupId>
@@ -145,6 +145,11 @@ the License.
       <version>1.2</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.2.1</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.lucene</groupId>
       <artifactId>lucene-core</artifactId>
       <version>2.0.0</version>
@@ -178,5 +183,11 @@ the License.
       <version>3.8.2</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.mesos</groupId>
+      <artifactId>mesos</artifactId>
+      <version>0.21.0</version>
+      <classifier>shaded-protobuf</classifier>
+   </dependency>
   </dependencies>
 </project>

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.batchmgr;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.MesosFrameworkException;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A batch-manager used to execute and control jobs in a mesos-cluster.
+ */
+public class MesosBatchManager implements Batchmgr {
+
+    Map<String,TaskID> map = new HashMap<String,TaskID>();
+    SchedulerDriver driver;
+    JobRepository repo;
+    Monitor mon;
+
+    public MesosBatchManager() {
+    }
+    /**
+     * Required to set the driver used to run the job, so "kill"
+     * requests are mapped properly.
+     * @param driver
+     */
+    public void setDriver(SchedulerDriver driver)
+    {
+        this.driver = driver;
+    }
+    /**
+     * Register a new job with a batch manager.
+     * @param jobId - jobId in "resource" manager.
+     * @param task - mesos task.
+     */
+    public void registerExecutedJob(String jobId,TaskID task) {
+        map.put(jobId, task);
+    };
+
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.batchmgr.Batchmgr#executeRemotely(org.apache.oodt.cas.resource.structs.JobSpec,
 org.apache.oodt.cas.resource.structs.ResourceNode)
+     */
+    @Override
+    public boolean executeRemotely(JobSpec job, ResourceNode resNode)
+            throws JobExecutionException {
+        throw new NotImplementedException("Execute remotely is not called when 
using mesos.");
+    }
+
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.batchmgr.Batchmgr#setMonitor(org.apache.oodt.cas.resource.monitor.Monitor)
+     */
+    @Override
+    public void setMonitor(Monitor monitor) {
+        this.mon = monitor;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.batchmgr.Batchmgr#setJobRepository(org.apache.oodt.cas.resource.jobrepo.JobRepository)
+     */
+    @Override
+    public void setJobRepository(JobRepository repository) {
+        this.repo = repository;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.batchmgr.Batchmgr#killJob(java.lang.String, 
org.apache.oodt.cas.resource.structs.ResourceNode)
+     */
+    @Override
+    public boolean killJob(String jobId, ResourceNode node) {
+        TaskID id = (TaskID)map.get(jobId);
+        driver.killTask(id);
+        Status status = driver.killTask(id);
+        if (status != Status.DRIVER_RUNNING)
+            throw new MesosFrameworkException("Mesos Schedule Driver is dead: 
"+status.toString());
+        return true;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.batchmgr.Batchmgr#getExecutionNode(java.lang.String)
+     */
+    @Override
+    public String getExecutionNode(String jobId) {
+        // TODO Make this more meaningful.
+        return "All Your Jobs are belong to Mesos";
+    }
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A batchmgr factory.
+ * </p>
+ *
+ */
+
+public class MesosBatchManagerFactory implements BatchmgrFactory {
+
+    public MesosBatchManagerFactory(){
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
org.apache.oodt.cas.resource.batchmgr.BatchmgrFactory#createBatchmgr()
+     */
+    public Batchmgr createBatchmgr() {
+        return new MesosBatchManager();
+    }
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.batchmgr;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.util.MesosUtilities;
+
+/**
+ * @author starchmd
+ *
+ * This "Executor" is run by mesos to actually run the job.
+ */
+public class ResourceExecutor implements Executor {
+
+    PrintStream str = null;
+    String id = new 
SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance().getTime())+" ";
+    public ResourceExecutor() {
+        try {
+            File tmp = new File("./executor-log.notalog");
+            //tmp.delete(); //With NIO then something must be caught
+            str = new PrintStream(new FileOutputStream(tmp));
+            str.println(id+"Starting up new<<<<<");
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();
+        }
+    }
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Executor#disconnected(org.apache.mesos.ExecutorDriver)
+     */
+    @Override
+    public void disconnected(ExecutorDriver driver) {
+        str.println(id+"Disconnected!");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.mesos.Executor#error(org.apache.mesos.ExecutorDriver, 
java.lang.String)
+     */
+    @Override
+    public void error(ExecutorDriver driver, String error) {
+        str.println(id+"Error: "+error);
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Executor#frameworkMessage(org.apache.mesos.ExecutorDriver, 
byte[])
+     */
+    @Override
+    public void frameworkMessage(ExecutorDriver arg0, byte[] arg1) {
+        str.println(id+"Message: "+new String(arg1));
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Executor#killTask(org.apache.mesos.ExecutorDriver, 
org.apache.mesos.Protos.TaskID)
+     */
+    @Override
+    public void killTask(ExecutorDriver arg0, TaskID arg1) {
+        str.println(id+"Kill");
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Executor#launchTask(org.apache.mesos.ExecutorDriver, 
org.apache.mesos.Protos.TaskInfo)
+     */
+    @Override
+    public void launchTask(final ExecutorDriver driver, final TaskInfo info) {
+        str.println(id+"Launch task!");
+        try {
+            JobSpec spec = MesosUtilities.byteStringToJobSpec(info.getData());
+            final JobInstance exec = GenericResourceManagerObjectFactory
+                    
.getJobInstanceFromClassName(spec.getJob().getJobInstanceClassName());
+            final JobInput in = spec.getIn();
+            Thread tmp = new Thread(new Runnable(){
+                    public void run() {
+                        TaskStatus status = null;
+                        try {
+                            exec.execute(in);
+                            status = 
TaskStatus.newBuilder().setTaskId(info.getTaskId())
+                                    .setState(TaskState.TASK_FINISHED).build();
+                        } catch (JobInputException e) {
+                            e.printStackTrace();
+                            status = 
TaskStatus.newBuilder().setTaskId(info.getTaskId())
+                                    .setState(TaskState.TASK_FAILED).build();
+                        }
+                        driver.sendStatusUpdate(status);
+                    }
+            });
+            
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+                  .setState(TaskState.TASK_STARTING).build());
+            tmp.start();
+        } catch (ClassNotFoundException e1) {
+            System.out.println("BAD DATA: ");
+            e1.printStackTrace();
+            
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+                    .setState(TaskState.TASK_FAILED).build());
+        } catch (InstantiationException e2) {
+            System.out.println("BAD DATA: ");
+            e2.printStackTrace();
+            
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+                    .setState(TaskState.TASK_FAILED).build());
+        } catch (IllegalAccessException e3) {
+            System.out.println("BAD DATA: ");
+            e3.printStackTrace();
+            
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+                    .setState(TaskState.TASK_FAILED).build());
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Executor#registered(org.apache.mesos.ExecutorDriver, 
org.apache.mesos.Protos.ExecutorInfo, org.apache.mesos.Protos.FrameworkInfo, 
org.apache.mesos.Protos.SlaveInfo)
+     */
+    @Override
+    public void registered(ExecutorDriver arg0, ExecutorInfo arg1,
+            FrameworkInfo arg2, SlaveInfo arg3) {
+        System.out.println("Do-Wah-Do-Wah");
+        str.println(id+"Registered, Huzzah!");
+
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Executor#reregistered(org.apache.mesos.ExecutorDriver, 
org.apache.mesos.Protos.SlaveInfo)
+     */
+    @Override
+    public void reregistered(ExecutorDriver arg0, SlaveInfo arg1) {
+        System.out.println("Do-Wah-Do-Wah GO GO GO!!!!");
+        str.println(id+"Re-regged");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.mesos.Executor#shutdown(org.apache.mesos.ExecutorDriver)
+     */
+    @Override
+    public void shutdown(ExecutorDriver arg0) {
+        System.out.println("Down down down.");
+        str.println(id+"Shutdown");
+    }
+
+    public static void main(String[] args) throws Exception {
+        MesosExecutorDriver driver = new MesosExecutorDriver(new 
ResourceExecutor());
+        System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
+    }
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.monitor;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A monitor to monitor the mesos-cluster jobs.
+ */
+public class MesosMonitor implements Monitor {
+
+    private static HashMap<String, ResourceNode> nodesMap = new 
HashMap<String, ResourceNode>();
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.monitor.Monitor#getLoad(org.apache.oodt.cas.resource.structs.ResourceNode)
+     */
+    @Override
+    public int getLoad(ResourceNode node) throws MonitorException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oodt.cas.resource.monitor.Monitor#getNodes()
+     */
+    @Override
+    public List<ResourceNode> getNodes() throws MonitorException {
+        return new LinkedList<ResourceNode>(nodesMap.values());
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.monitor.Monitor#getNodeById(java.lang.String)
+     */
+    @Override
+    public ResourceNode getNodeById(String nodeId) throws MonitorException {
+        return nodesMap.get(nodeId);
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.monitor.Monitor#getNodeByURL(java.net.URL)
+     */
+    @Override
+    public ResourceNode getNodeByURL(URL ipAddr) throws MonitorException {
+        for (ResourceNode node : nodesMap.values())
+            if (node.getIpAddr().equals(ipAddr))
+                return node;
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.monitor.Monitor#reduceLoad(org.apache.oodt.cas.resource.structs.ResourceNode,
 int)
+     */
+    @Override
+    public boolean reduceLoad(ResourceNode node, int loadValue)
+            throws MonitorException {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.monitor.Monitor#assignLoad(org.apache.oodt.cas.resource.structs.ResourceNode,
 int)
+     */
+    @Override
+    public boolean assignLoad(ResourceNode node, int loadValue)
+            throws MonitorException {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.monitor.Monitor#addNode(org.apache.oodt.cas.resource.structs.ResourceNode)
+     */
+    @Override
+    public void addNode(ResourceNode node) throws MonitorException {
+        nodesMap.put(node.getNodeId(), node);
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.oodt.cas.resource.monitor.Monitor#removeNodeById(java.lang.String)
+     */
+    @Override
+    public void removeNodeById(String nodeId) throws MonitorException {
+        nodesMap.remove(nodeId);
+    }
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.monitor;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * Creates implementations of {@link MesosMonitor}s.
+ * </p>
+ *
+ */
+public class MesosMonitorFactory implements MonitorFactory {
+
+    private static final Logger LOG = Logger
+            .getLogger(MesosMonitorFactory.class.getName());
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * gov.nasa.jpl.oodt.cas.resource.monitor.MonitorFactory#createMonitor()
+     */
+    public MesosMonitor createMonitor() {
+        try {
+            return new MesosMonitor();
+        } catch (Exception e) {
+            LOG.log(Level.SEVERE, "Failed to create Mesos Monitor : " + 
e.getMessage(), e);
+            return null;
+        }
+    }
+
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.scheduler;
+
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.batchmgr.MesosBatchManager;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.MesosFrameworkException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
+import org.apache.oodt.cas.resource.util.MesosUtilities;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A scheduler for part of the mesos frame work.
+ */
+public class ResourceMesosScheduler implements Scheduler, 
org.apache.oodt.cas.resource.scheduler.Scheduler {
+    SchedulerDriver driver;
+    MesosBatchManager batch;
+    ExecutorInfo executor;
+    JobQueue queue;
+    Monitor mon;
+
+    //Logger
+    private static final Logger LOG = 
Logger.getLogger(ResourceMesosScheduler.class.getName());
+    /**
+     * Construct the scheduler
+     * @param batch - batch manager (must be MesosBatchManager)
+     * @param executor - Mesos ExecutorInfo
+     * @param queue Job Queue used
+     * @param mon - monitor used.
+     */
+    public ResourceMesosScheduler(MesosBatchManager batch,ExecutorInfo 
executor, JobQueue queue, Monitor mon) {
+        this.batch = batch;
+        this.executor = executor;
+        this.queue = queue;
+        this.mon = mon;
+        LOG.log(Level.INFO,"Creating the resource-mesos scheduler.");
+    }
+
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#disconnected(org.apache.mesos.SchedulerDriver)
+     */
+    @Override
+    public void disconnected(SchedulerDriver schedDriver) {
+        //TODO: Pause scheduler until master comes back online.
+
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.mesos.Scheduler#error(org.apache.mesos.SchedulerDriver, 
java.lang.String)
+     */
+    @Override
+    public void error(SchedulerDriver schedDriver, String error) {
+        LOG.log(Level.SEVERE,"Mesos issued an error: "+error);
+        //TODO: kill something here.
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#executorLost(org.apache.mesos.SchedulerDriver, 
org.apache.mesos.Protos.ExecutorID, org.apache.mesos.Protos.SlaveID, int)
+     */
+    @Override
+    public void executorLost(SchedulerDriver schedDriver, ExecutorID 
executor,SlaveID slave, int status) {
+        //Tasks will have a "task lost" message automatically q.e.d no action 
necessary.
+        //TODO: do we need to restart?
+        LOG.log(Level.SEVERE,"Mesos executor "+executor+" on slave "+slave+" 
died with status "+status);
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#frameworkMessage(org.apache.mesos.SchedulerDriver, 
org.apache.mesos.Protos.ExecutorID, org.apache.mesos.Protos.SlaveID, byte[])
+     */
+    @Override
+    public void frameworkMessage(SchedulerDriver schedDriver, ExecutorID 
executor,
+            SlaveID slave, byte[] bytes) {
+        try {
+            LOG.log(Level.INFO,"Mesos framework executor"+executor+" on slave 
"+slave+" issued message: "+
+                new String(bytes,"ascii"));
+        } catch (UnsupportedEncodingException e) {
+            LOG.log(Level.WARNING,"Mesos framework message missed due to bad 
encoding: ascii. "+e.getMessage());
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#offerRescinded(org.apache.mesos.SchedulerDriver, 
org.apache.mesos.Protos.OfferID)
+     */
+    @Override
+    public void offerRescinded(SchedulerDriver schedDriver, OfferID offer) {
+        //TODO: take away resources from batch manager...or stand in.
+        //Unneeded?
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#registered(org.apache.mesos.SchedulerDriver, 
org.apache.mesos.Protos.FrameworkID, org.apache.mesos.Protos.MasterInfo)
+     */
+    @Override
+    public void registered(SchedulerDriver schedDriver, FrameworkID framework,
+            MasterInfo masterInfo) {
+        LOG.log(Level.INFO,"Mesos framework registered: 
"+framework.getValue()+" with master: "+masterInfo.getId());
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#reregistered(org.apache.mesos.SchedulerDriver, 
org.apache.mesos.Protos.MasterInfo)
+     */
+    @Override
+    public void reregistered(SchedulerDriver schedDriver, MasterInfo 
masterInfo) {
+        LOG.log(Level.INFO,"Mesos framework re-registered with: 
"+masterInfo.getId());
+        //TODO: call start, we are registered.
+
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#resourceOffers(org.apache.mesos.SchedulerDriver, 
java.util.List)
+     */
+    @Override
+    public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
+        LOG.log(Level.INFO,"Offered mesos resources: "+offers.size()+" 
offers.");
+        //Log, if possible the offers
+        if (LOG.isLoggable(Level.FINER)) {
+            for (Offer offer : offers) {
+                try {
+                    this.mon.addNode(new 
ResourceNode(offer.getSlaveId().getValue(),new 
URL("http://"+offer.getHostname()),-1));
+                } catch (MalformedURLException e) {
+                    LOG.log(Level.WARNING,"Cannot add node to monitor (bad 
url).  Giving up: "+e.getMessage());
+                } catch ( MonitorException e) {
+                    LOG.log(Level.WARNING,"Cannot add node to monitor (unkn).  
Giving up: "+e.getMessage());
+                }
+                LOG.log(Level.FINER,"Offer ("+offer.getId().getValue()+"): 
"+offer.getHostname()+ "(Slave: "+
+                        offer.getSlaveId().getValue()+") 
"+MesosUtilities.getResourceMessage(offer.getResourcesList()));
+            }
+        }
+        List<JobSet> assignments = this.getJobAssignmentsJobs(offers);
+        List<OfferID> used = new LinkedList<OfferID>();
+        for (JobSet assignment : assignments) {
+            //Launch tasks requires lists
+            List<OfferID> ids = new LinkedList<OfferID>();
+            List<TaskInfo> tasks = new LinkedList<TaskInfo>();
+            tasks.add(assignment.task);
+            used.add(assignment.offer.getId());
+            ids.add(assignment.offer.getId());
+            //Register locally and launch on mesos
+            batch.registerExecutedJob(assignment.job.getJob().getId(), 
assignment.task.getTaskId());
+            Status status = driver.launchTasks(ids,tasks); //Assumed one to 
one mapping
+            if (status != Status.DRIVER_RUNNING)
+                throw new MesosFrameworkException("Driver stopped: 
"+status.toString());
+        }
+        for (Offer offer : offers) {
+            if (!used.contains(offer.getId())) {
+                LOG.log(Level.INFO,"Rejecting Offer: 
"+offer.getId().getValue());
+                driver.declineOffer(offer.getId());
+            }
+        }
+    }
+    /**
+     * Builds a TaskInfo from the given jobspec
+     * @param job - JobSpec to TaskInfo-ify
+     * @param offer - offer add extra data (SlaveId)
+     * @return TaskInfo fully formed
+     */
+    private TaskInfo getTaskInfo(JobSpec job,Offer offer) {
+        TaskID taskId = 
TaskID.newBuilder().setValue(job.getJob().getId()).build();
+        TaskInfo info = TaskInfo.newBuilder().setName("task " + 
taskId.getValue())
+                 .setTaskId(taskId)
+                 .setSlaveId(offer.getSlaveId())
+                 .addResources(Resource.newBuilder()
+                               .setName("cpus")
+                               .setType(Value.Type.SCALAR)
+                               
.setScalar(Value.Scalar.newBuilder().setValue(job.getJob().getLoadValue()*1.0)))
+                 .addResources(Resource.newBuilder()
+                               .setName("mem")
+                               .setType(Value.Type.SCALAR)
+                               
.setScalar(Value.Scalar.newBuilder().setValue(job.getJob().getLoadValue()*1024.0)))
+                 
.setExecutor(ExecutorInfo.newBuilder(executor)).setData(MesosUtilities.jobSpecToByteString(job)).build();
+        return info;
+    }
+    /**
+     * Checks all offers against jobs in order, assigning jobs to offers until 
each offer is full,
+     * or all jobs are gone.
+     * @param offers - offers to assign jobs to.
+     * @return List of <JobSpec,TaskInfo,Offer> tuples (assigned to each 
other).
+     */
+    private List<JobSet> getJobAssignmentsJobs(List<Offer> offers) {
+        List<JobSet> list = new LinkedList<JobSet>();
+        for (Offer offer : offers)
+        {
+            double cpus = 0.0, mem = 0.0;
+            //Get the resources offered from this offer
+            for (Resource resc : offer.getResourcesList()) {
+                if (resc.getName().equals("cpus"))
+                    cpus += resc.getScalar().getValue();
+                if (resc.getName().equals("mem"))
+                    mem += resc.getScalar().getValue();
+            }
+            //Search for enough jobs to fill the offer
+            for (int i = 0;i < queue.getSize();i++)
+            {
+                try {
+                    JobSpec job = queue.getNextJob();
+                    double load = job.getJob().getLoadValue();
+                    //Check if enough resources
+                    if (cpus < load || mem < load*1024)
+                    {
+                        queue.requeueJob(job);
+                        continue;
+                    }
+                    cpus -= load;
+                    mem -= 1024*load;
+                    JobSet tmp = new JobSet(job,getTaskInfo(job,offer),offer);
+                    list.add(tmp);
+                    //Not enough left, optimise and stop looking for jobs
+                    if (cpus < 0.5 || mem <= 512.0)
+                        break;
+                } catch (JobQueueException e) {throw new RuntimeException(e);}
+            }
+            //Optimization: break when no jobs
+            if (queue.getSize() == 0)
+                break;
+        }
+        return list;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#slaveLost(org.apache.mesos.SchedulerDriver, 
org.apache.mesos.Protos.SlaveID)
+     */
+    @Override
+    public void slaveLost(SchedulerDriver schedDriver, SlaveID slave) {
+        LOG.log(Level.WARNING,"Mesos slave "+slave+" lost, reissuing jobs.");
+        //TODO: reregister jobs
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.mesos.Scheduler#statusUpdate(org.apache.mesos.SchedulerDriver, 
org.apache.mesos.Protos.TaskStatus)
+     */
+    @Override
+    public void statusUpdate(SchedulerDriver schedDriver, TaskStatus 
taskStatus) {
+        //TODO: deliver messages, some rerun, some finish.
+        LOG.log(Level.INFO,"Status update: "+taskStatus.getMessage());
+    }
+
+
+
+    @Override
+    public void run() {
+        LOG.log(Level.INFO,"Attempting to run framework. Nothing to do.");
+        LOG.log(Level.FINEST, "Paradigm shift enabled.");
+        LOG.log(Level.FINEST, "Spin and poll surplanted by event based 
execution.");
+        LOG.log(Level.FINEST, "Mesos-OODT Fusion complete.");
+        //Don't run anything
+        return;
+    }
+
+
+
+    @Override
+    public boolean schedule(JobSpec spec) throws SchedulerException {
+        throw new NotImplementedException("Schedule is not called when using 
mesos.");
+    }
+
+
+
+    @Override
+    public ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException {
+        return null;
+    }
+
+
+
+    @Override
+    public Monitor getMonitor() {
+        return mon;
+    }
+
+
+
+    @Override
+    public Batchmgr getBatchmgr() {
+        return batch;
+    }
+
+
+
+    @Override
+    public JobQueue getJobQueue() {
+        // TODO Auto-generated method stub
+        return queue;
+    }
+
+
+
+    @Override
+    public QueueManager getQueueManager() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    //Job set used internally to simplify data transmission
+    private class JobSet {
+        public JobSpec job;
+        public TaskInfo task;
+        public Offer offer;
+        //Build a job set
+        public JobSet(JobSpec job, TaskInfo task, Offer offer) {
+            this.job = job;
+            this.task = task;
+            this.offer = offer;
+        }
+    }
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.scheduler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.oodt.cas.resource.batchmgr.MesosBatchManager;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+/**
+ * A class to setup the resource manager's mesos framework.
+ * @author starchmd
+ * @version $Revision$
+ *
+ */
+public class ResourceMesosSchedulerFactory implements SchedulerFactory {
+
+    private static final Logger LOG = 
Logger.getLogger(ResourceMesosSchedulerFactory.class.getName());
+
+    private Monitor mon = null;
+    private MesosBatchManager batch = null;
+    private JobQueue queue = null;
+
+    public ResourceMesosSchedulerFactory() {}
+
+    public Scheduler construct() {
+        try {
+            String uri = 
System.getProperty("org.apache.oodt.cas.resource.mesos.executor.uri","./oodt-executor.in");
+            //Framework info
+            FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
+                        .setName("OODT Resource Manager Mesos 
Framework").setUser("")
+                        
.setId(FrameworkID.newBuilder().setValue("OODT-Resource Framework").build());
+            FrameworkInfo framework = frameworkBuilder.build();
+            ExecutorInfo executor = 
ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("OODT-Resource").build())
+                    .setCommand(CommandInfo.newBuilder().setValue(new 
File(uri).getCanonicalPath()).build())
+                    .setName("OODT Resource Manager Executor").build();
+            SchedulerDriver driver = null;
+
+            //Resource manager properties
+            String batchmgrClassStr = 
"org.apache.oodt.cas.resource.batchmgr.MesosBatchManagerFactory";
+            String monitorClassStr = 
"org.apache.oodt.cas.resource.monitor.MesosMonitorFactory";
+            String jobQueueClassStr = 
System.getProperty("resource.jobqueue.factory","org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory");
+            String ip = 
System.getProperty("resource.mesos.master.ip","127.0.0.1:5050");
+
+            batch = 
(MesosBatchManager)GenericResourceManagerObjectFactory.getBatchmgrServiceFromFactory(batchmgrClassStr);
+            mon = 
GenericResourceManagerObjectFactory.getMonitorServiceFromFactory(monitorClassStr);
+            queue = 
GenericResourceManagerObjectFactory.getJobQueueServiceFromFactory(jobQueueClassStr);
+            batch.setMonitor(mon);
+            batch.setDriver(driver);
+            batch.setJobRepository(queue.getJobRepository());
+
+            LOG.log(Level.INFO,"Connecting to Mesos Master at: "+ip);
+            System.out.println("Connecting to Mesos Master at: "+ip);
+            ResourceMesosScheduler scheduler = new 
ResourceMesosScheduler(batch, executor, queue, mon);
+
+            final MesosSchedulerDriver mesos = new 
MesosSchedulerDriver(scheduler, framework, ip);
+            //Anonymous thread to run
+            new Thread(new Runnable() {
+                public void run() {
+                    int status = mesos.run() == Status.DRIVER_STOPPED ? 0 : 1;
+                    mesos.stop();
+                }
+            }).start();
+            return scheduler;
+        } catch(IOException ioe) {
+            LOG.log(Level.SEVERE,"Exception detected: "+ioe.getMessage());
+            ioe.printStackTrace();
+            throw new RuntimeException(ioe);
+        }
+    }
+
+    @Override
+    public Scheduler createScheduler() {
+        return construct();
+    }
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.structs;
+
+import java.io.Serializable;
+
+/**
+ * A class used to serialize and de-serialize a job spec
+ * @author starchmd
+ */
+public class JobSpecSerializer implements java.io.Serializable {
+    private static final long serialVersionUID = -8246199042863932667L;
+    //Variables needed to serialize
+    String id;
+    String jobInputClassName;
+    String jobInstanceClassName;
+    Integer loadValue;
+    String name;
+    String queueName;
+    String status;
+    String jobInputId;
+    Serializable jobInput;
+    /**
+     * Set the variables to serialize them.
+     * @param spec - job spec to serialize
+     */
+    public JobSpecSerializer(JobSpec spec) {
+        //Job
+        Job tmp = spec.getJob();
+        id = tmp.getId();
+        jobInputClassName = tmp.getJobInputClassName();
+        jobInstanceClassName = tmp.getJobInstanceClassName();
+        loadValue = tmp.getLoadValue();
+        name = tmp.getName();
+        queueName = tmp.getQueueName();
+        status = tmp.getStatus();
+        //Input of spec
+        JobInput input = spec.getIn();
+        jobInputId = input.getId();
+        jobInput = (Serializable)input.write();
+    }
+    /**
+     * Get the JobSpec back.
+     * @return newly constructed job spec
+     * @throws ClassNotFoundException
+     * @throws IllegalAccessException
+     * @throws InstantiationException
+     */
+    public JobSpec getJobSpec() throws ClassNotFoundException, 
InstantiationException, IllegalAccessException {
+        Job tmp = new Job();
+        tmp.setId(id);
+        tmp.setJobInputClassName(jobInputClassName);
+        tmp.setJobInstanceClassName(jobInstanceClassName);
+        tmp.setLoadValue(loadValue);
+        tmp.setName(name);
+        tmp.setQueueName(queueName);
+        tmp.setStatus(status);
+        //Read in job input, using proper class
+        Class<?> clazz = Class.forName(jobInputClassName);
+        JobInput input = ((JobInput)clazz.newInstance());
+        input.read(jobInput);
+        JobSpec spec = new JobSpec();
+        spec.setIn(input);
+        spec.setJob(tmp);
+        return spec;
+    }
+}
\ No newline at end of file

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.structs.exceptions;
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * An exception.
+ */
+public class MesosFrameworkException extends RuntimeException {
+
+    public MesosFrameworkException(String error) {
+        super(error);
+    }
+}

Added: 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
 (added)
+++ 
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.util;
+
+import java.util.Collection;
+
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.Value.Range;
+import org.apache.mesos.Protos.Value.Type;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.JobSpecSerializer;
+import org.apache.mesos.protobuf.ByteString;
+/**
+ * @author starchmd
+ * @version $Revision$
+ */
+public class MesosUtilities {
+    /**
+     * Get a ByteString serialization of a JobSpec to send over the wire to 
the mesos-backend.
+     * @param jobSpec - JobSpec to serialize
+     * @return bytestring containing byte[] of jobspec
+     */
+    public static ByteString jobSpecToByteString(JobSpec jobSpec)
+    {
+        return ByteString.copyFrom(SerializationUtils.serialize(new 
JobSpecSerializer(jobSpec)));
+    }
+    /**
+     * Build a JobSpec from a ByteString off the wire
+     * @param data - ByteString to deserialize
+     * @return newly minted JobSpec
+     * @throws IllegalAccessException
+     * @throws InstantiationException
+     * @throws ClassNotFoundException
+     */
+    public static JobSpec byteStringToJobSpec(ByteString data) throws 
ClassNotFoundException, InstantiationException, IllegalAccessException
+    {
+        return 
((JobSpecSerializer)SerializationUtils.deserialize(data.toByteArray())).getJobSpec();
+    }
+
+    /**
+     * Makes a string message from resources list.
+     * @param resources - resource list to make into string.
+     * @return string representing the resource list.
+     */
+    public static String getResourceMessage(Collection<Resource> resources) {
+        String ret = "";
+        for (Resource res : resources)
+            ret += "\n\t"+getResourceMessage(res);
+        return ret;
+    }
+
+    /**
+     * Creates string out an offer in a nice format.
+     * @param resource - mesos resource to make into string.
+     * @return string representing a resource.
+     */
+    public static String getResourceMessage(Resource resource) {
+        Type type = resource.getType();
+        String ret = resource.getName() +" "+resource.getRole()+ ": ";
+        switch (type) {
+            case SCALAR:
+                ret += resource.getScalar().getValue();
+                break;
+            case RANGES:
+                for (Range range : resource.getRanges().getRangeList())
+                    ret += range.getBegin() + " - "+range.getEnd()+",";
+                break;
+            case TEXT:
+                ret += " TEXT type...cannot find.";
+                break;
+            case SET:
+                for (String string : resource.getSet().getItemList())
+                    ret += string + ",";
+                break;
+        }
+        return ret;
+    }
+}
\ No newline at end of file

Added: 
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java?rev=1649884&view=auto
==============================================================================
--- 
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
 (added)
+++ 
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
 Tue Jan  6 17:39:22 2015
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.util;
+
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+
+
+import org.apache.oodt.cas.resource.util.MesosUtilities;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * Test Suite for the {@link MesosUtilities} class
+ * </p>.
+ */
+public class TestMesosUtilities extends TestCase {
+
+    public void testSerialization() {
+        JobSpec js = new JobSpec();
+
+        Job job = new Job();
+        job.setId("crazy-id");
+        job.setJobInputClassName(NameValueJobInput.class.getCanonicalName());
+        job.setJobInstanceClassName("Instance Class");
+        job.setLoadValue(new Integer(352));
+        job.setName("A Name");
+        job.setQueueName("Queue");
+        job.setStatus("Status");
+
+        String[] props = {"prop-1","prop-2","prop-3"};
+        NameValueJobInput nvji = new NameValueJobInput();
+        for (String str : props)
+            nvji.setNameValuePair(str, str+"val");
+
+        js.setIn(nvji);
+        js.setJob(job);
+
+        JobSpec ns;
+        try {
+            ns = 
MesosUtilities.byteStringToJobSpec(MesosUtilities.jobSpecToByteString(js));
+            TestCase.assertEquals(js.getJob().getId(),ns.getJob().getId());
+            
TestCase.assertEquals(js.getJob().getJobInputClassName(),ns.getJob().getJobInputClassName());
+            
TestCase.assertEquals(js.getJob().getJobInstanceClassName(),ns.getJob().getJobInstanceClassName());
+            
TestCase.assertEquals(js.getJob().getLoadValue(),ns.getJob().getLoadValue());
+            TestCase.assertEquals(js.getJob().getName(),ns.getJob().getName());
+            
TestCase.assertEquals(js.getJob().getQueueName(),ns.getJob().getQueueName());
+            
TestCase.assertEquals(js.getJob().getStatus(),ns.getJob().getStatus());
+            TestCase.assertEquals(js.getIn().getId(),ns.getIn().getId());
+            for (String str : props)
+                
TestCase.assertEquals(nvji.getValue(str),((NameValueJobInput)ns.getIn()).getValue(str));
+        } catch (ClassNotFoundException e) {
+            TestCase.fail("Unexpected exception:"+e.getLocalizedMessage());
+        } catch (InstantiationException e) {
+            TestCase.fail("Unexpected exception:"+e.getLocalizedMessage());
+        } catch (IllegalAccessException e) {
+            TestCase.fail("Unexpected exception:"+e.getLocalizedMessage());
+        }
+
+    }
+
+}


Reply via email to