Author: starchmd
Date: Tue Jan 6 17:39:22 2015
New Revision: 1649884
URL: http://svn.apache.org/r1649884
Log:
OODT-699: Mesos cluster manager backend to resource manager
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
Modified:
oodt/trunk/CHANGES.txt
oodt/trunk/resource/pom.xml
Modified: oodt/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1649884&r1=1649883&r2=1649884&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Tue Jan 6 17:39:22 2015
@@ -3,6 +3,8 @@ Apache OODT Change Log
Release 0.9 - Current Development
-------------------------------------------
+* OODT-699 Mesos cluster manager backend to resource manager
+
* OODT-780 Spark backend to resource manager
* OODT-802 Create Dockerfile for OODT Radix.
Modified: oodt/trunk/resource/pom.xml
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/pom.xml?rev=1649884&r1=1649883&r2=1649884&view=diff
==============================================================================
--- oodt/trunk/resource/pom.xml (original)
+++ oodt/trunk/resource/pom.xml Tue Jan 6 17:39:22 2015
@@ -137,7 +137,7 @@ the License.
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
- <version>2.1</version>
+ <version>3.2.1</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
@@ -145,6 +145,11 @@ the License.
<version>1.2</version>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>2.0.0</version>
@@ -178,5 +183,11 @@ the License.
<version>3.8.2</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ <version>0.21.0</version>
+ <classifier>shaded-protobuf</classifier>
+ </dependency>
</dependencies>
</project>
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManager.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.batchmgr;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.structs.exceptions.MesosFrameworkException;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A batch-manager used to execute and control jobs in a mesos-cluster.
+ */
+public class MesosBatchManager implements Batchmgr {
+
+ Map<String,TaskID> map = new HashMap<String,TaskID>();
+ SchedulerDriver driver;
+ JobRepository repo;
+ Monitor mon;
+
+ public MesosBatchManager() {
+ }
+ /**
+ * Required to set the driver used to run the job, so "kill"
+ * requests are mapped properly.
+ * @param driver
+ */
+ public void setDriver(SchedulerDriver driver)
+ {
+ this.driver = driver;
+ }
+ /**
+ * Register a new job with a batch manager.
+ * @param jobId - jobId in "resource" manager.
+ * @param task - mesos task.
+ */
+ public void registerExecutedJob(String jobId,TaskID task) {
+ map.put(jobId, task);
+ };
+
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.batchmgr.Batchmgr#executeRemotely(org.apache.oodt.cas.resource.structs.JobSpec,
org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public boolean executeRemotely(JobSpec job, ResourceNode resNode)
+ throws JobExecutionException {
+ throw new NotImplementedException("Execute remotely is not called when
using mesos.");
+ }
+
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.batchmgr.Batchmgr#setMonitor(org.apache.oodt.cas.resource.monitor.Monitor)
+ */
+ @Override
+ public void setMonitor(Monitor monitor) {
+ this.mon = monitor;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.batchmgr.Batchmgr#setJobRepository(org.apache.oodt.cas.resource.jobrepo.JobRepository)
+ */
+ @Override
+ public void setJobRepository(JobRepository repository) {
+ this.repo = repository;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.batchmgr.Batchmgr#killJob(java.lang.String,
org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public boolean killJob(String jobId, ResourceNode node) {
+ TaskID id = (TaskID)map.get(jobId);
+ driver.killTask(id);
+ Status status = driver.killTask(id);
+ if (status != Status.DRIVER_RUNNING)
+ throw new MesosFrameworkException("Mesos Schedule Driver is dead:
"+status.toString());
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.batchmgr.Batchmgr#getExecutionNode(java.lang.String)
+ */
+ @Override
+ public String getExecutionNode(String jobId) {
+ // TODO Make this more meaningful.
+ return "All Your Jobs are belong to Mesos";
+ }
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/MesosBatchManagerFactory.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.resource.batchmgr;
+
+
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * A batchmgr factory.
+ * </p>
+ *
+ */
+
+public class MesosBatchManagerFactory implements BatchmgrFactory {
+
+ public MesosBatchManagerFactory(){
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
org.apache.oodt.cas.resource.batchmgr.BatchmgrFactory#createBatchmgr()
+ */
+ public Batchmgr createBatchmgr() {
+ return new MesosBatchManager();
+ }
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/ResourceExecutor.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.batchmgr;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+import org.apache.oodt.cas.resource.util.MesosUtilities;
+
+/**
+ * @author starchmd
+ *
+ * This "Executor" is run by mesos to actually run the job.
+ */
+public class ResourceExecutor implements Executor {
+
+ PrintStream str = null;
+ String id = new
SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance().getTime())+" ";
+ public ResourceExecutor() {
+ try {
+ File tmp = new File("./executor-log.notalog");
+ //tmp.delete(); //With NIO then something must be caught
+ str = new PrintStream(new FileOutputStream(tmp));
+ str.println(id+"Starting up new<<<<<");
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Executor#disconnected(org.apache.mesos.ExecutorDriver)
+ */
+ @Override
+ public void disconnected(ExecutorDriver driver) {
+ str.println(id+"Disconnected!");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.mesos.Executor#error(org.apache.mesos.ExecutorDriver,
java.lang.String)
+ */
+ @Override
+ public void error(ExecutorDriver driver, String error) {
+ str.println(id+"Error: "+error);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Executor#frameworkMessage(org.apache.mesos.ExecutorDriver,
byte[])
+ */
+ @Override
+ public void frameworkMessage(ExecutorDriver arg0, byte[] arg1) {
+ str.println(id+"Message: "+new String(arg1));
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Executor#killTask(org.apache.mesos.ExecutorDriver,
org.apache.mesos.Protos.TaskID)
+ */
+ @Override
+ public void killTask(ExecutorDriver arg0, TaskID arg1) {
+ str.println(id+"Kill");
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Executor#launchTask(org.apache.mesos.ExecutorDriver,
org.apache.mesos.Protos.TaskInfo)
+ */
+ @Override
+ public void launchTask(final ExecutorDriver driver, final TaskInfo info) {
+ str.println(id+"Launch task!");
+ try {
+ JobSpec spec = MesosUtilities.byteStringToJobSpec(info.getData());
+ final JobInstance exec = GenericResourceManagerObjectFactory
+
.getJobInstanceFromClassName(spec.getJob().getJobInstanceClassName());
+ final JobInput in = spec.getIn();
+ Thread tmp = new Thread(new Runnable(){
+ public void run() {
+ TaskStatus status = null;
+ try {
+ exec.execute(in);
+ status =
TaskStatus.newBuilder().setTaskId(info.getTaskId())
+ .setState(TaskState.TASK_FINISHED).build();
+ } catch (JobInputException e) {
+ e.printStackTrace();
+ status =
TaskStatus.newBuilder().setTaskId(info.getTaskId())
+ .setState(TaskState.TASK_FAILED).build();
+ }
+ driver.sendStatusUpdate(status);
+ }
+ });
+
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+ .setState(TaskState.TASK_STARTING).build());
+ tmp.start();
+ } catch (ClassNotFoundException e1) {
+ System.out.println("BAD DATA: ");
+ e1.printStackTrace();
+
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+ .setState(TaskState.TASK_FAILED).build());
+ } catch (InstantiationException e2) {
+ System.out.println("BAD DATA: ");
+ e2.printStackTrace();
+
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+ .setState(TaskState.TASK_FAILED).build());
+ } catch (IllegalAccessException e3) {
+ System.out.println("BAD DATA: ");
+ e3.printStackTrace();
+
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
+ .setState(TaskState.TASK_FAILED).build());
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Executor#registered(org.apache.mesos.ExecutorDriver,
org.apache.mesos.Protos.ExecutorInfo, org.apache.mesos.Protos.FrameworkInfo,
org.apache.mesos.Protos.SlaveInfo)
+ */
+ @Override
+ public void registered(ExecutorDriver arg0, ExecutorInfo arg1,
+ FrameworkInfo arg2, SlaveInfo arg3) {
+ System.out.println("Do-Wah-Do-Wah");
+ str.println(id+"Registered, Huzzah!");
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Executor#reregistered(org.apache.mesos.ExecutorDriver,
org.apache.mesos.Protos.SlaveInfo)
+ */
+ @Override
+ public void reregistered(ExecutorDriver arg0, SlaveInfo arg1) {
+ System.out.println("Do-Wah-Do-Wah GO GO GO!!!!");
+ str.println(id+"Re-regged");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.mesos.Executor#shutdown(org.apache.mesos.ExecutorDriver)
+ */
+ @Override
+ public void shutdown(ExecutorDriver arg0) {
+ System.out.println("Down down down.");
+ str.println(id+"Shutdown");
+ }
+
+ public static void main(String[] args) throws Exception {
+ MesosExecutorDriver driver = new MesosExecutorDriver(new
ResourceExecutor());
+ System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
+ }
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitor.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.monitor;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A monitor to monitor the mesos-cluster jobs.
+ */
+public class MesosMonitor implements Monitor {
+
+ private static HashMap<String, ResourceNode> nodesMap = new
HashMap<String, ResourceNode>();
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.monitor.Monitor#getLoad(org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public int getLoad(ResourceNode node) throws MonitorException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.resource.monitor.Monitor#getNodes()
+ */
+ @Override
+ public List<ResourceNode> getNodes() throws MonitorException {
+ return new LinkedList<ResourceNode>(nodesMap.values());
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.monitor.Monitor#getNodeById(java.lang.String)
+ */
+ @Override
+ public ResourceNode getNodeById(String nodeId) throws MonitorException {
+ return nodesMap.get(nodeId);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.monitor.Monitor#getNodeByURL(java.net.URL)
+ */
+ @Override
+ public ResourceNode getNodeByURL(URL ipAddr) throws MonitorException {
+ for (ResourceNode node : nodesMap.values())
+ if (node.getIpAddr().equals(ipAddr))
+ return node;
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.monitor.Monitor#reduceLoad(org.apache.oodt.cas.resource.structs.ResourceNode,
int)
+ */
+ @Override
+ public boolean reduceLoad(ResourceNode node, int loadValue)
+ throws MonitorException {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.monitor.Monitor#assignLoad(org.apache.oodt.cas.resource.structs.ResourceNode,
int)
+ */
+ @Override
+ public boolean assignLoad(ResourceNode node, int loadValue)
+ throws MonitorException {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.monitor.Monitor#addNode(org.apache.oodt.cas.resource.structs.ResourceNode)
+ */
+ @Override
+ public void addNode(ResourceNode node) throws MonitorException {
+ nodesMap.put(node.getNodeId(), node);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.oodt.cas.resource.monitor.Monitor#removeNodeById(java.lang.String)
+ */
+ @Override
+ public void removeNodeById(String nodeId) throws MonitorException {
+ nodesMap.remove(nodeId);
+ }
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/monitor/MesosMonitorFactory.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.monitor;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * Creates implementations of {@link MesosMonitor}s.
+ * </p>
+ *
+ */
+public class MesosMonitorFactory implements MonitorFactory {
+
+ private static final Logger LOG = Logger
+ .getLogger(MesosMonitorFactory.class.getName());
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * gov.nasa.jpl.oodt.cas.resource.monitor.MonitorFactory#createMonitor()
+ */
+ public MesosMonitor createMonitor() {
+ try {
+ return new MesosMonitor();
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to create Mesos Monitor : " +
e.getMessage(), e);
+ return null;
+ }
+ }
+
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosScheduler.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.scheduler;
+
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
+import org.apache.oodt.cas.resource.batchmgr.MesosBatchManager;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.ResourceNode;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.MesosFrameworkException;
+import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
+import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
+import org.apache.oodt.cas.resource.util.MesosUtilities;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * A scheduler for part of the mesos frame work.
+ */
+public class ResourceMesosScheduler implements Scheduler,
org.apache.oodt.cas.resource.scheduler.Scheduler {
+ SchedulerDriver driver;
+ MesosBatchManager batch;
+ ExecutorInfo executor;
+ JobQueue queue;
+ Monitor mon;
+
+ //Logger
+ private static final Logger LOG =
Logger.getLogger(ResourceMesosScheduler.class.getName());
+ /**
+ * Construct the scheduler
+ * @param batch - batch manager (must be MesosBatchManager)
+ * @param executor - Mesos ExecutorInfo
+ * @param queue Job Queue used
+ * @param mon - monitor used.
+ */
+ public ResourceMesosScheduler(MesosBatchManager batch,ExecutorInfo
executor, JobQueue queue, Monitor mon) {
+ this.batch = batch;
+ this.executor = executor;
+ this.queue = queue;
+ this.mon = mon;
+ LOG.log(Level.INFO,"Creating the resource-mesos scheduler.");
+ }
+
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#disconnected(org.apache.mesos.SchedulerDriver)
+ */
+ @Override
+ public void disconnected(SchedulerDriver schedDriver) {
+ //TODO: Pause scheduler until master comes back online.
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.mesos.Scheduler#error(org.apache.mesos.SchedulerDriver,
java.lang.String)
+ */
+ @Override
+ public void error(SchedulerDriver schedDriver, String error) {
+ LOG.log(Level.SEVERE,"Mesos issued an error: "+error);
+ //TODO: kill something here.
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#executorLost(org.apache.mesos.SchedulerDriver,
org.apache.mesos.Protos.ExecutorID, org.apache.mesos.Protos.SlaveID, int)
+ */
+ @Override
+ public void executorLost(SchedulerDriver schedDriver, ExecutorID
executor,SlaveID slave, int status) {
+ //Tasks will have a "task lost" message automatically q.e.d no action
necessary.
+ //TODO: do we need to restart?
+ LOG.log(Level.SEVERE,"Mesos executor "+executor+" on slave "+slave+"
died with status "+status);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#frameworkMessage(org.apache.mesos.SchedulerDriver,
org.apache.mesos.Protos.ExecutorID, org.apache.mesos.Protos.SlaveID, byte[])
+ */
+ @Override
+ public void frameworkMessage(SchedulerDriver schedDriver, ExecutorID
executor,
+ SlaveID slave, byte[] bytes) {
+ try {
+ LOG.log(Level.INFO,"Mesos framework executor"+executor+" on slave
"+slave+" issued message: "+
+ new String(bytes,"ascii"));
+ } catch (UnsupportedEncodingException e) {
+ LOG.log(Level.WARNING,"Mesos framework message missed due to bad
encoding: ascii. "+e.getMessage());
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#offerRescinded(org.apache.mesos.SchedulerDriver,
org.apache.mesos.Protos.OfferID)
+ */
+ @Override
+ public void offerRescinded(SchedulerDriver schedDriver, OfferID offer) {
+ //TODO: take away resources from batch manager...or stand in.
+ //Unneeded?
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#registered(org.apache.mesos.SchedulerDriver,
org.apache.mesos.Protos.FrameworkID, org.apache.mesos.Protos.MasterInfo)
+ */
+ @Override
+ public void registered(SchedulerDriver schedDriver, FrameworkID framework,
+ MasterInfo masterInfo) {
+ LOG.log(Level.INFO,"Mesos framework registered:
"+framework.getValue()+" with master: "+masterInfo.getId());
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#reregistered(org.apache.mesos.SchedulerDriver,
org.apache.mesos.Protos.MasterInfo)
+ */
+ @Override
+ public void reregistered(SchedulerDriver schedDriver, MasterInfo
masterInfo) {
+ LOG.log(Level.INFO,"Mesos framework re-registered with:
"+masterInfo.getId());
+ //TODO: call start, we are registered.
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#resourceOffers(org.apache.mesos.SchedulerDriver,
java.util.List)
+ */
+ @Override
+ public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
+ LOG.log(Level.INFO,"Offered mesos resources: "+offers.size()+"
offers.");
+ //Log, if possible the offers
+ if (LOG.isLoggable(Level.FINER)) {
+ for (Offer offer : offers) {
+ try {
+ this.mon.addNode(new
ResourceNode(offer.getSlaveId().getValue(),new
URL("http://"+offer.getHostname()),-1));
+ } catch (MalformedURLException e) {
+ LOG.log(Level.WARNING,"Cannot add node to monitor (bad
url). Giving up: "+e.getMessage());
+ } catch ( MonitorException e) {
+ LOG.log(Level.WARNING,"Cannot add node to monitor (unkn).
Giving up: "+e.getMessage());
+ }
+ LOG.log(Level.FINER,"Offer ("+offer.getId().getValue()+"):
"+offer.getHostname()+ "(Slave: "+
+ offer.getSlaveId().getValue()+")
"+MesosUtilities.getResourceMessage(offer.getResourcesList()));
+ }
+ }
+ List<JobSet> assignments = this.getJobAssignmentsJobs(offers);
+ List<OfferID> used = new LinkedList<OfferID>();
+ for (JobSet assignment : assignments) {
+ //Launch tasks requires lists
+ List<OfferID> ids = new LinkedList<OfferID>();
+ List<TaskInfo> tasks = new LinkedList<TaskInfo>();
+ tasks.add(assignment.task);
+ used.add(assignment.offer.getId());
+ ids.add(assignment.offer.getId());
+ //Register locally and launch on mesos
+ batch.registerExecutedJob(assignment.job.getJob().getId(),
assignment.task.getTaskId());
+ Status status = driver.launchTasks(ids,tasks); //Assumed one to
one mapping
+ if (status != Status.DRIVER_RUNNING)
+ throw new MesosFrameworkException("Driver stopped:
"+status.toString());
+ }
+ for (Offer offer : offers) {
+ if (!used.contains(offer.getId())) {
+ LOG.log(Level.INFO,"Rejecting Offer:
"+offer.getId().getValue());
+ driver.declineOffer(offer.getId());
+ }
+ }
+ }
+ /**
+ * Builds a TaskInfo from the given jobspec
+ * @param job - JobSpec to TaskInfo-ify
+ * @param offer - offer add extra data (SlaveId)
+ * @return TaskInfo fully formed
+ */
+ private TaskInfo getTaskInfo(JobSpec job,Offer offer) {
+ TaskID taskId =
TaskID.newBuilder().setValue(job.getJob().getId()).build();
+ TaskInfo info = TaskInfo.newBuilder().setName("task " +
taskId.getValue())
+ .setTaskId(taskId)
+ .setSlaveId(offer.getSlaveId())
+ .addResources(Resource.newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+
.setScalar(Value.Scalar.newBuilder().setValue(job.getJob().getLoadValue()*1.0)))
+ .addResources(Resource.newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+
.setScalar(Value.Scalar.newBuilder().setValue(job.getJob().getLoadValue()*1024.0)))
+
.setExecutor(ExecutorInfo.newBuilder(executor)).setData(MesosUtilities.jobSpecToByteString(job)).build();
+ return info;
+ }
+ /**
+ * Checks all offers against jobs in order, assigning jobs to offers until
each offer is full,
+ * or all jobs are gone.
+ * @param offers - offers to assign jobs to.
+ * @return List of <JobSpec,TaskInfo,Offer> tuples (assigned to each
other).
+ */
+ private List<JobSet> getJobAssignmentsJobs(List<Offer> offers) {
+ List<JobSet> list = new LinkedList<JobSet>();
+ for (Offer offer : offers)
+ {
+ double cpus = 0.0, mem = 0.0;
+ //Get the resources offered from this offer
+ for (Resource resc : offer.getResourcesList()) {
+ if (resc.getName().equals("cpus"))
+ cpus += resc.getScalar().getValue();
+ if (resc.getName().equals("mem"))
+ mem += resc.getScalar().getValue();
+ }
+ //Search for enough jobs to fill the offer
+ for (int i = 0;i < queue.getSize();i++)
+ {
+ try {
+ JobSpec job = queue.getNextJob();
+ double load = job.getJob().getLoadValue();
+ //Check if enough resources
+ if (cpus < load || mem < load*1024)
+ {
+ queue.requeueJob(job);
+ continue;
+ }
+ cpus -= load;
+ mem -= 1024*load;
+ JobSet tmp = new JobSet(job,getTaskInfo(job,offer),offer);
+ list.add(tmp);
+ //Not enough left, optimise and stop looking for jobs
+ if (cpus < 0.5 || mem <= 512.0)
+ break;
+ } catch (JobQueueException e) {throw new RuntimeException(e);}
+ }
+ //Optimization: break when no jobs
+ if (queue.getSize() == 0)
+ break;
+ }
+ return list;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#slaveLost(org.apache.mesos.SchedulerDriver,
org.apache.mesos.Protos.SlaveID)
+ */
+ @Override
+ public void slaveLost(SchedulerDriver schedDriver, SlaveID slave) {
+ LOG.log(Level.WARNING,"Mesos slave "+slave+" lost, reissuing jobs.");
+ //TODO: reregister jobs
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.mesos.Scheduler#statusUpdate(org.apache.mesos.SchedulerDriver,
org.apache.mesos.Protos.TaskStatus)
+ */
+ @Override
+ public void statusUpdate(SchedulerDriver schedDriver, TaskStatus
taskStatus) {
+ //TODO: deliver messages, some rerun, some finish.
+ LOG.log(Level.INFO,"Status update: "+taskStatus.getMessage());
+ }
+
+
+
+ @Override
+ public void run() {
+ LOG.log(Level.INFO,"Attempting to run framework. Nothing to do.");
+ LOG.log(Level.FINEST, "Paradigm shift enabled.");
+ LOG.log(Level.FINEST, "Spin and poll surplanted by event based
execution.");
+ LOG.log(Level.FINEST, "Mesos-OODT Fusion complete.");
+ //Don't run anything
+ return;
+ }
+
+
+
+ @Override
+ public boolean schedule(JobSpec spec) throws SchedulerException {
+ throw new NotImplementedException("Schedule is not called when using
mesos.");
+ }
+
+
+
+ @Override
+ public ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException {
+ return null;
+ }
+
+
+
+ @Override
+ public Monitor getMonitor() {
+ return mon;
+ }
+
+
+
+ @Override
+ public Batchmgr getBatchmgr() {
+ return batch;
+ }
+
+
+
+ @Override
+ public JobQueue getJobQueue() {
+ // TODO Auto-generated method stub
+ return queue;
+ }
+
+
+
+ @Override
+ public QueueManager getQueueManager() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ //Job set used internally to simplify data transmission
+ private class JobSet {
+ public JobSpec job;
+ public TaskInfo task;
+ public Offer offer;
+ //Build a job set
+ public JobSet(JobSpec job, TaskInfo task, Offer offer) {
+ this.job = job;
+ this.task = task;
+ this.offer = offer;
+ }
+ }
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/ResourceMesosSchedulerFactory.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.scheduler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.oodt.cas.resource.batchmgr.MesosBatchManager;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.monitor.Monitor;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+/**
+ * A class to setup the resource manager's mesos framework.
+ * @author starchmd
+ * @version $Revision$
+ *
+ */
+public class ResourceMesosSchedulerFactory implements SchedulerFactory {
+
+ private static final Logger LOG =
Logger.getLogger(ResourceMesosSchedulerFactory.class.getName());
+
+ private Monitor mon = null;
+ private MesosBatchManager batch = null;
+ private JobQueue queue = null;
+
+ public ResourceMesosSchedulerFactory() {}
+
+ public Scheduler construct() {
+ try {
+ String uri =
System.getProperty("org.apache.oodt.cas.resource.mesos.executor.uri","./oodt-executor.in");
+ //Framework info
+ FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
+ .setName("OODT Resource Manager Mesos
Framework").setUser("")
+
.setId(FrameworkID.newBuilder().setValue("OODT-Resource Framework").build());
+ FrameworkInfo framework = frameworkBuilder.build();
+ ExecutorInfo executor =
ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("OODT-Resource").build())
+ .setCommand(CommandInfo.newBuilder().setValue(new
File(uri).getCanonicalPath()).build())
+ .setName("OODT Resource Manager Executor").build();
+ SchedulerDriver driver = null;
+
+ //Resource manager properties
+ String batchmgrClassStr =
"org.apache.oodt.cas.resource.batchmgr.MesosBatchManagerFactory";
+ String monitorClassStr =
"org.apache.oodt.cas.resource.monitor.MesosMonitorFactory";
+ String jobQueueClassStr =
System.getProperty("resource.jobqueue.factory","org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory");
+ String ip =
System.getProperty("resource.mesos.master.ip","127.0.0.1:5050");
+
+ batch =
(MesosBatchManager)GenericResourceManagerObjectFactory.getBatchmgrServiceFromFactory(batchmgrClassStr);
+ mon =
GenericResourceManagerObjectFactory.getMonitorServiceFromFactory(monitorClassStr);
+ queue =
GenericResourceManagerObjectFactory.getJobQueueServiceFromFactory(jobQueueClassStr);
+ batch.setMonitor(mon);
+ batch.setDriver(driver);
+ batch.setJobRepository(queue.getJobRepository());
+
+ LOG.log(Level.INFO,"Connecting to Mesos Master at: "+ip);
+ System.out.println("Connecting to Mesos Master at: "+ip);
+ ResourceMesosScheduler scheduler = new
ResourceMesosScheduler(batch, executor, queue, mon);
+
+ final MesosSchedulerDriver mesos = new
MesosSchedulerDriver(scheduler, framework, ip);
+ //Anonymous thread to run
+ new Thread(new Runnable() {
+ public void run() {
+ int status = mesos.run() == Status.DRIVER_STOPPED ? 0 : 1;
+ mesos.stop();
+ }
+ }).start();
+ return scheduler;
+ } catch(IOException ioe) {
+ LOG.log(Level.SEVERE,"Exception detected: "+ioe.getMessage());
+ ioe.printStackTrace();
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public Scheduler createScheduler() {
+ return construct();
+ }
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobSpecSerializer.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.structs;
+
+import java.io.Serializable;
+
+/**
+ * A class used to serialize and de-serialize a job spec
+ * @author starchmd
+ */
+public class JobSpecSerializer implements java.io.Serializable {
+ private static final long serialVersionUID = -8246199042863932667L;
+ //Variables needed to serialize
+ String id;
+ String jobInputClassName;
+ String jobInstanceClassName;
+ Integer loadValue;
+ String name;
+ String queueName;
+ String status;
+ String jobInputId;
+ Serializable jobInput;
+ /**
+ * Set the variables to serialize them.
+ * @param spec - job spec to serialize
+ */
+ public JobSpecSerializer(JobSpec spec) {
+ //Job
+ Job tmp = spec.getJob();
+ id = tmp.getId();
+ jobInputClassName = tmp.getJobInputClassName();
+ jobInstanceClassName = tmp.getJobInstanceClassName();
+ loadValue = tmp.getLoadValue();
+ name = tmp.getName();
+ queueName = tmp.getQueueName();
+ status = tmp.getStatus();
+ //Input of spec
+ JobInput input = spec.getIn();
+ jobInputId = input.getId();
+ jobInput = (Serializable)input.write();
+ }
+ /**
+ * Get the JobSpec back.
+ * @return newly constructed job spec
+ * @throws ClassNotFoundException
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ public JobSpec getJobSpec() throws ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ Job tmp = new Job();
+ tmp.setId(id);
+ tmp.setJobInputClassName(jobInputClassName);
+ tmp.setJobInstanceClassName(jobInstanceClassName);
+ tmp.setLoadValue(loadValue);
+ tmp.setName(name);
+ tmp.setQueueName(queueName);
+ tmp.setStatus(status);
+ //Read in job input, using proper class
+ Class<?> clazz = Class.forName(jobInputClassName);
+ JobInput input = ((JobInput)clazz.newInstance());
+ input.read(jobInput);
+ JobSpec spec = new JobSpec();
+ spec.setIn(input);
+ spec.setJob(tmp);
+ return spec;
+ }
+}
\ No newline at end of file
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/MesosFrameworkException.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.structs.exceptions;
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * An exception.
+ */
+public class MesosFrameworkException extends RuntimeException {
+
+ public MesosFrameworkException(String error) {
+ super(error);
+ }
+}
Added:
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
(added)
+++
oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/MesosUtilities.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.util;
+
+import java.util.Collection;
+
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.Value.Range;
+import org.apache.mesos.Protos.Value.Type;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.JobSpecSerializer;
+import org.apache.mesos.protobuf.ByteString;
+/**
+ * @author starchmd
+ * @version $Revision$
+ */
+public class MesosUtilities {
+ /**
+ * Get a ByteString serialization of a JobSpec to send over the wire to
the mesos-backend.
+ * @param jobSpec - JobSpec to serialize
+ * @return bytestring containing byte[] of jobspec
+ */
+ public static ByteString jobSpecToByteString(JobSpec jobSpec)
+ {
+ return ByteString.copyFrom(SerializationUtils.serialize(new
JobSpecSerializer(jobSpec)));
+ }
+ /**
+ * Build a JobSpec from a ByteString off the wire
+ * @param data - ByteString to deserialize
+ * @return newly minted JobSpec
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ * @throws ClassNotFoundException
+ */
+ public static JobSpec byteStringToJobSpec(ByteString data) throws
ClassNotFoundException, InstantiationException, IllegalAccessException
+ {
+ return
((JobSpecSerializer)SerializationUtils.deserialize(data.toByteArray())).getJobSpec();
+ }
+
+ /**
+ * Makes a string message from resources list.
+ * @param resources - resource list to make into string.
+ * @return string representing the resource list.
+ */
+ public static String getResourceMessage(Collection<Resource> resources) {
+ String ret = "";
+ for (Resource res : resources)
+ ret += "\n\t"+getResourceMessage(res);
+ return ret;
+ }
+
+ /**
+ * Creates string out an offer in a nice format.
+ * @param resource - mesos resource to make into string.
+ * @return string representing a resource.
+ */
+ public static String getResourceMessage(Resource resource) {
+ Type type = resource.getType();
+ String ret = resource.getName() +" "+resource.getRole()+ ": ";
+ switch (type) {
+ case SCALAR:
+ ret += resource.getScalar().getValue();
+ break;
+ case RANGES:
+ for (Range range : resource.getRanges().getRangeList())
+ ret += range.getBegin() + " - "+range.getEnd()+",";
+ break;
+ case TEXT:
+ ret += " TEXT type...cannot find.";
+ break;
+ case SET:
+ for (String string : resource.getSet().getItemList())
+ ret += string + ",";
+ break;
+ }
+ return ret;
+ }
+}
\ No newline at end of file
Added:
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java?rev=1649884&view=auto
==============================================================================
---
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
(added)
+++
oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/util/TestMesosUtilities.java
Tue Jan 6 17:39:22 2015
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.resource.util;
+
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.NameValueJobInput;
+
+
+import org.apache.oodt.cas.resource.util.MesosUtilities;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * @author starchmd
+ * @version $Revision$
+ *
+ * <p>
+ * Test Suite for the {@link MesosUtilities} class
+ * </p>.
+ */
+public class TestMesosUtilities extends TestCase {
+
+ public void testSerialization() {
+ JobSpec js = new JobSpec();
+
+ Job job = new Job();
+ job.setId("crazy-id");
+ job.setJobInputClassName(NameValueJobInput.class.getCanonicalName());
+ job.setJobInstanceClassName("Instance Class");
+ job.setLoadValue(new Integer(352));
+ job.setName("A Name");
+ job.setQueueName("Queue");
+ job.setStatus("Status");
+
+ String[] props = {"prop-1","prop-2","prop-3"};
+ NameValueJobInput nvji = new NameValueJobInput();
+ for (String str : props)
+ nvji.setNameValuePair(str, str+"val");
+
+ js.setIn(nvji);
+ js.setJob(job);
+
+ JobSpec ns;
+ try {
+ ns =
MesosUtilities.byteStringToJobSpec(MesosUtilities.jobSpecToByteString(js));
+ TestCase.assertEquals(js.getJob().getId(),ns.getJob().getId());
+
TestCase.assertEquals(js.getJob().getJobInputClassName(),ns.getJob().getJobInputClassName());
+
TestCase.assertEquals(js.getJob().getJobInstanceClassName(),ns.getJob().getJobInstanceClassName());
+
TestCase.assertEquals(js.getJob().getLoadValue(),ns.getJob().getLoadValue());
+ TestCase.assertEquals(js.getJob().getName(),ns.getJob().getName());
+
TestCase.assertEquals(js.getJob().getQueueName(),ns.getJob().getQueueName());
+
TestCase.assertEquals(js.getJob().getStatus(),ns.getJob().getStatus());
+ TestCase.assertEquals(js.getIn().getId(),ns.getIn().getId());
+ for (String str : props)
+
TestCase.assertEquals(nvji.getValue(str),((NameValueJobInput)ns.getIn()).getValue(str));
+ } catch (ClassNotFoundException e) {
+ TestCase.fail("Unexpected exception:"+e.getLocalizedMessage());
+ } catch (InstantiationException e) {
+ TestCase.fail("Unexpected exception:"+e.getLocalizedMessage());
+ } catch (IllegalAccessException e) {
+ TestCase.fail("Unexpected exception:"+e.getLocalizedMessage());
+ }
+
+ }
+
+}