BEWM On Fri, Oct 24, 2014 at 2:39 PM, <[email protected]> wrote:
> > commits Digest 24 Oct 2014 21:39:27 -0000 Issue 947 > > Topics (messages 6955 through 6955) > > svn commit: r1634142 - in /oodt/trunk/resource/src: > main/java/org/apache/oodt/cas/resource/mux/ > main/java/org/apache/oodt/cas/resource/structs/exceptions/ > main/java/org/apache/oodt/cas/resource/util/ main/resources/ > main/resources/examples/ test/org/ap... > 6955 by: starchmd.apache.org > > Administrivia: > > --------------------------------------------------------------------- > To post to the list, e-mail: [email protected] > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > > ---------------------------------------------------------------------- > > > > ---------- Forwarded message ---------- > From: [email protected] > To: [email protected] > Cc: > Date: Fri, 24 Oct 2014 21:38:01 -0000 > Subject: svn commit: r1634142 - in /oodt/trunk/resource/src: > main/java/org/apache/oodt/cas/resource/mux/ > main/java/org/apache/oodt/cas/resource/structs/exceptions/ > main/java/org/apache/oodt/cas/resource/util/ main/resources/ > main/resources/examples/ test/org/ap... > Author: starchmd > Date: Fri Oct 24 21:38:01 2014 > New Revision: 1634142 > > URL: http://svn.apache.org/r1634142 > Log: > Submitting multiplexing backend for resource manager > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/ > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java > > oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/ > > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java > > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/ > > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java > > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java > Modified: > > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java > oodt/trunk/resource/src/main/resources/resource.properties > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendManager.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,67 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.oodt.cas.resource.mux; > + > +import java.util.List; > + > +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; > +import org.apache.oodt.cas.resource.monitor.Monitor; > +import org.apache.oodt.cas.resource.scheduler.Scheduler; > +import > org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; > + > +/** > + * Interface for the backend manager > + * > + * @author starchmd > + */ > +public interface BackendManager { > + > + /** > + * Add in a backend set to this manager. > + * @param queue - queue that maps to the given monitor, batchmgr, and > scheduler > + * @param monitor - monitor used for this set > + * @param batchmgr - batch manager for this set > + * @param scheduler - scheduler for this set > + */ > + public void addSet(String queue,Monitor monitor, Batchmgr batchmgr, > Scheduler scheduler); > + /** > + * Return monitor for the given queue. > + * @param queue - queue to check > + * @return montior > + * @throws QueueManagerException when queue does not exist > + */ > + public Monitor getMonitor(String queue) throws QueueManagerException; > + /** > + * Return batch manager for the given queue. > + * @param queue - queue to check > + * @return batchmgr > + * @throws QueueManagerException when queue does not exist > + */ > + public Batchmgr getBatchmgr(String queue) throws > QueueManagerException; > + /** > + * Return scheduler for the given queue. > + * @param queue - queue to check > + * @return scheduler > + * @throws QueueManagerException when queue does not exist > + */ > + public Scheduler getScheduler(String queue) throws > QueueManagerException; > + /** > + * Return a list of all monitors. > + * @return list of all monitors > + */ > + public List<Monitor> getMonitors(); > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepository.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,33 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.oodt.cas.resource.mux; > + > +import > org.apache.oodt.cas.resource.structs.exceptions.RepositoryException; > + > +/** > + * Interface to handle loading of the configuration for which queues are > associated > + * with which backend. i.e. read BackendManager configuration. > + * > + * @author starchmd > + */ > +public interface BackendRepository { > + /** > + * Load the backend. > + * @return BackendManager all set up and ready to go. > + */ > + public BackendManager load() throws RepositoryException; > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/BackendRepositoryFactory.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,36 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.oodt.cas.resource.mux; > + > +/** > + * > + * @author starchmd > + * @version $Revision$ > + * > + * <p> > + * The Backend Manager Repository Factory interface. > + * </p> > + */ > +public interface BackendRepositoryFactory { > + > + /** > + * Create a backend repository > + * @return the newly minted backend repository > + */ > + public BackendRepository createBackendRepository(); > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxBatchManager.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,129 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.oodt.cas.resource.mux; > + > +import java.util.HashMap; > +import java.util.Map; > +import java.util.logging.Level; > +import java.util.logging.Logger; > + > +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; > +import org.apache.oodt.cas.resource.jobrepo.JobRepository; > +import org.apache.oodt.cas.resource.monitor.Monitor; > +import org.apache.oodt.cas.resource.structs.JobSpec; > +import org.apache.oodt.cas.resource.structs.ResourceNode; > +import > org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; > +import > org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; > + > +/** > + * @author starchmd > + * @version $Revision$ > + * > + * A batch-manager used to execute and control jobs in a mesos-cluster. > + */ > +public class QueueMuxBatchManager implements Batchmgr { > + > + private Logger LOG = > Logger.getLogger(QueueMuxBatchManager.class.getName()); > + > + BackendManager backend; > + Map<String,String> jobIdToQueue = new HashMap<String,String>(); > + JobRepository repo; > + > + /** > + * ctor > + * @param bm - backend manager > + */ > + public QueueMuxBatchManager(BackendManager bm) { > + setBackendManager(bm); > + } > + /** > + * Set the backend manager. > + * @param backend - backend manager effectively mapping queue's to > sets of backends. > + */ > + public void setBackendManager(BackendManager backend) { > + this.backend = backend; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.batchmgr.Batchmgr#executeRemotely(org.apache.oodt.cas.resource.structs.JobSpec, > org.apache.oodt.cas.resource.structs.ResourceNode) > + */ > + @Override > + public boolean executeRemotely(JobSpec job, ResourceNode resNode) > + throws JobExecutionException { > + try { > + > jobIdToQueue.put(job.getJob().getId(),job.getJob().getQueueName()); > + return > getManagerByQueue(job.getJob().getQueueName()).executeRemotely(job, > resNode); > + } catch (QueueManagerException e) { > + jobIdToQueue.remove(job.getJob().getQueueName()); > + LOG.log(Level.WARNING, "Exception recieved while executing > job: "+e.getLocalizedMessage()+". Job will not execute."); > + throw new JobExecutionException(e); > + } > + } > + > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.batchmgr.Batchmgr#setMonitor(org.apache.oodt.cas.resource.monitor.Monitor) > + */ > + @Override > + public void setMonitor(Monitor monitor) { > + throw new UnsupportedOperationException("Cannot set the monitor > when using the queue-mux batch manager."); > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.batchmgr.Batchmgr#setJobRepository(org.apache.oodt.cas.resource.jobrepo.JobRepository) > + */ > + @Override > + public void setJobRepository(JobRepository repository) { > + this.repo = repository; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.batchmgr.Batchmgr#killJob(java.lang.String, > org.apache.oodt.cas.resource.structs.ResourceNode) > + */ > + @Override > + public boolean killJob(String jobId, ResourceNode node) { > + try { > + return getManagerByJob(jobId).killJob(jobId,node); > + } catch (QueueManagerException e) { > + LOG.log(Level.SEVERE, "Cannot kill job: > "+e.getLocalizedMessage()); > + } > + return false; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.batchmgr.Batchmgr#getExecutionNode(java.lang.String) > + */ > + @Override > + public String getExecutionNode(String jobId) { > + try { > + return getManagerByJob(jobId).getExecutionNode(jobId); > + } catch (QueueManagerException e) { > + LOG.log(Level.SEVERE, "Cannot get exectuion node for job: > "+e.getLocalizedMessage()); > + } > + return null; > + } > + > + private Batchmgr getManagerByJob(String jobId) throws > QueueManagerException { > + return getManagerByQueue(jobIdToQueue.get(jobId)); > + } > + > + private Batchmgr getManagerByQueue(String queue) throws > QueueManagerException { > + return this.backend.getBatchmgr(queue); > + } > + > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxMonitor.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,207 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.oodt.cas.resource.mux; > + > +import java.net.URL; > +import java.util.Iterator; > +import java.util.LinkedHashSet; > +import java.util.LinkedList; > +import java.util.List; > +import java.util.Set; > +import java.util.logging.Level; > +import java.util.logging.Logger; > + > +import org.apache.oodt.cas.resource.monitor.Monitor; > +import org.apache.oodt.cas.resource.scheduler.QueueManager; > +import org.apache.oodt.cas.resource.structs.ResourceNode; > +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; > +import > org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; > + > +/** > + * @author starchmd > + * @version $Revision$ > + * > + * A monitor to monitor the multiple monitors. > + */ > +public class QueueMuxMonitor implements Monitor { > + private static final Logger LOG = > Logger.getLogger(QueueMuxMonitor.class.getName()); > + private BackendManager backend; > + private QueueManager qManager; > + /** > + * ctor > + * @param backend - backend manager > + * @param qManager - queue manager > + */ > + public QueueMuxMonitor(BackendManager backend, QueueManager qManager) > { > + setBackendManager(backend,qManager); > + } > + /** > + * Set the backend manager. > + * @param backend - backend manager effectively mapping queue's to > sets of backends. > + */ > + public void setBackendManager(BackendManager backend, QueueManager > qManager) { > + this.backend = backend; > + this.qManager = qManager; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.monitor.Monitor#getLoad(org.apache.oodt.cas.resource.structs.ResourceNode) > + */ > + @Override > + public int getLoad(ResourceNode node) throws MonitorException { > + //Unclear what to do here. > + //Assuming we should never be more than "Max" > + List<String> queues = queuesForNode(node); > + int max = 0; > + for (String queue : queues) { > + try { > + max = > Math.max(max,backend.getMonitor(queue).getLoad(node)); > + } catch (QueueManagerException e) { > + LOG.log(Level.WARNING,"Queue '"+queue+"' has > dissappeared."); > + } > + } > + return max; > + } > + > + /* (non-Javadoc) > + * @see org.apache.oodt.cas.resource.monitor.Monitor#getNodes() > + */ > + @Override > + public List<ResourceNode> getNodes() throws MonitorException { > + Set<ResourceNode> set = new LinkedHashSet<ResourceNode>(); > + for (Monitor mon:this.backend.getMonitors()) { > + for (Object res:mon.getNodes()) { > + set.add((ResourceNode)res); > + } > + } > + return new LinkedList<ResourceNode>(set); > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.monitor.Monitor#getNodeById(java.lang.String) > + */ > + @Override > + public ResourceNode getNodeById(String nodeId) throws > MonitorException { > + ResourceNode node = null; > + Iterator<Monitor> imon = this.backend.getMonitors().iterator(); > + while(imon.hasNext() && (node = imon.next().getNodeById(nodeId)) > == null) {} > + return node; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.monitor.Monitor#getNodeByURL(java.net.URL) > + */ > + @Override > + public ResourceNode getNodeByURL(URL ipAddr) throws MonitorException { > + ResourceNode node = null; > + Iterator<Monitor> imon = this.backend.getMonitors().iterator(); > + while(imon.hasNext() && (node = imon.next().getNodeByURL(ipAddr)) > == null) {} > + return node; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.monitor.Monitor#reduceLoad(org.apache.oodt.cas.resource.structs.ResourceNode, > int) > + */ > + @Override > + public boolean reduceLoad(ResourceNode node, int loadValue) > + throws MonitorException { > + List<String> queues = queuesForNode(node); > + boolean ret = true; > + for (String queue:queues) { > + try { > + ret &= backend.getMonitor(queue).reduceLoad(node, > loadValue); > + } catch (QueueManagerException e) { > + LOG.log(Level.SEVERE,"Queue '"+queue+"' has > dissappeared."); > + throw new MonitorException(e); > + } > + } > + return ret; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.monitor.Monitor#assignLoad(org.apache.oodt.cas.resource.structs.ResourceNode, > int) > + */ > + @Override > + public boolean assignLoad(ResourceNode node, int loadValue) > + throws MonitorException { > + List<String> queues = queuesForNode(node); > + boolean ret = true; > + for (String queue:queues) { > + try { > + ret &= backend.getMonitor(queue).assignLoad(node, > loadValue); > + } catch (QueueManagerException e) { > + LOG.log(Level.SEVERE,"Queue '"+queue+"' has > dissappeared."); > + throw new MonitorException(e); > + } > + } > + return ret; > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.monitor.Monitor#addNode(org.apache.oodt.cas.resource.structs.ResourceNode) > + */ > + @Override > + public void addNode(ResourceNode node) throws MonitorException { > + List<String> queues = queuesForNode(node); > + for (String queue:queues) { > + try { > + backend.getMonitor(queue).addNode(node); > + } catch (QueueManagerException e) { > + LOG.log(Level.SEVERE,"Queue '"+queue+"' has > dissappeared."); > + throw new MonitorException(e); > + } > + } > + } > + > + /* (non-Javadoc) > + * @see > org.apache.oodt.cas.resource.monitor.Monitor#removeNodeById(java.lang.String) > + */ > + @Override > + public void removeNodeById(String nodeId) throws MonitorException { > + for (Monitor mon:this.backend.getMonitors()) { > + mon.removeNodeById(nodeId); > + } > + } > + /** > + * Gets the queues that are associated with a particular node. > + * @param node - node which queues are needed for > + * @return list of queue names on that node > + */ > + private List<String> queuesForNode(ResourceNode node) { > + List<String> ret = new LinkedList<String>(); > + //Get list of queues > + List<String> queues = null; > + try > + { > + queues = qManager.getQueues(); > + } catch (QueueManagerException e) { > + LOG.log(Level.SEVERE, "Cannot list queues."); > + } > + //Search each queu to see if it contains given node > + for (String queue : queues) { > + try > + { > + if (qManager.getNodes(queue).contains(node.getNodeId())) { > + ret.add(queue); > + } > + } catch(QueueManagerException e) { > + LOG.log(Level.SEVERE, "Queue '"+queue+"' has > dissappeared."); > + } > + } > + return ret; > + } > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxScheduler.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,181 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > + > +package org.apache.oodt.cas.resource.mux; > + > +//JDKimports > +import java.util.logging.Level; > +import java.util.logging.Logger; > + > + > + > + > + > + > +//OODT imports > +import org.apache.oodt.cas.resource.jobqueue.JobQueue; > +import org.apache.oodt.cas.resource.monitor.Monitor; > +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; > +import org.apache.oodt.cas.resource.scheduler.QueueManager; > +import org.apache.oodt.cas.resource.scheduler.Scheduler; > +import org.apache.oodt.cas.resource.structs.JobSpec; > +import org.apache.oodt.cas.resource.structs.ResourceNode; > +import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; > +import > org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; > +import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException; > + > +/** > + * This scheduler multiplexes between multiple schedulers based on the > "queue" . > + * > + * @author starchmd > + * @version $Revision$ > + */ > +public class QueueMuxScheduler implements Scheduler { > + > + private static final Logger LOG = > Logger.getLogger(QueueMuxScheduler.class.getName()); > + > + private BackendManager backend; > + private JobQueue queue; > + private float waitTime = -1; > + > + //Manages other queue-muxing components > + private QueueMuxBatchManager batch; > + private QueueMuxMonitor mon; > + private QueueManager qManager; > + > + /** > + * ctor > + * @param backend - Backend manager to handle the many different > backends. > + */ > + public QueueMuxScheduler(BackendManager backend, QueueManager qm, > JobQueue jq) { > + String waitStr = > System.getProperty("org.apache.oodt.cas.resource.scheduler.wait.seconds", > "20"); > + waitTime = Float.parseFloat(waitStr); > + this.queue = jq; > + this.qManager = qm; > + this.backend = backend; > + //Required, so make them here > + batch = new QueueMuxBatchManager(backend); > + mon = new QueueMuxMonitor(backend,qm); > + } > + > + /* > + * (non-Javadoc) > + * > + * @see java.lang.Runnable#run() > + */ > + public void run() { > + //Loop forever > + while (true) { > + try { > + Thread.sleep((long) (waitTime * 1000.0)); > + } catch (InterruptedException e) { > + //If the thread will continue, reinterrupt thread > + Thread.currentThread().interrupt(); > + } > + //You have jobs > + if (!queue.isEmpty()) { > + JobSpec job = null; > + try { > + job = queue.getNextJob(); > + LOG.log(Level.INFO, "Scheduling job: ["+ > job.getJob().getId()+ "] for execution"); > + schedule(job); > + } catch (JobQueueException je) { > + LOG.log(Level.WARNING,"Error getting job from queue: " > + + je.getLocalizedMessage()); > + } catch (SchedulerException se) { > + LOG.log(Level.WARNING,"Error occured scheduling job: > "+se.getLocalizedMessage()); > + try { > + queue.requeueJob(job); > + } catch (JobQueueException je) { > + LOG.log(Level.WARNING,"Error requeueing job: > "+je.getLocalizedMessage()); > + LOG.log(Level.WARNING,"Previous error caused by: > "+se.getLocalizedMessage()); > + } > + } > + } > + } > + } > + > + /* > + * (non-Javadoc) > + * > + * @see > gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#schedule(gov.nasa.jpl.oodt.cas.resource.structs.JobSpec) > + */ > + public synchronized boolean schedule(JobSpec spec) > + throws SchedulerException { > + System.out.println("Spec: "+spec+" Job: "+spec.getJob()+" > Backend:"+backend); > + String queue = spec.getJob().getQueueName(); > + try { > + return backend.getScheduler(queue).schedule(spec); > + } catch (QueueManagerException e) { > + LOG.log(Level.WARNING,"Exception occuered: > "+e.getLocalizedMessage()); > + throw new SchedulerException(e); > + } > + } > + > + /* > + * (non-Javadoc) > + * > + * @see > gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getBatchmgr() > + */ > + public Batchmgr getBatchmgr() { > + return batch; > + } > + > + /* > + * (non-Javadoc) > + * > + * @see > gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getMonitor() > + */ > + public Monitor getMonitor() { > + return mon; > + } > + > + /* > + * (non-Javadoc) > + * > + * @see > gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getJobQueue() > + */ > + public JobQueue getJobQueue() { > + return this.queue; > + } > + > + /* > + * (non-Javadoc) > + * > + * @see > gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getQueueManager() > + */ > + public QueueManager getQueueManager() { > + return qManager; > + } > + > + /* > + * (non-Javadoc) > + * > + * @see > gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(gov.nasa.jpl.oodt.cas.resource.structs.JobSpec) > + */ > + public synchronized ResourceNode nodeAvailable(JobSpec spec) > + throws SchedulerException { > + String queue = spec.getJob().getQueueName(); > + try { > + return backend.getScheduler(queue).nodeAvailable(spec); > + } catch (QueueManagerException e) { > + LOG.log(Level.WARNING,"Exception occuered: > "+e.getLocalizedMessage()); > + throw new SchedulerException(e); > + } > + } > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,74 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.oodt.cas.resource.mux; > + > +import java.util.logging.Level; > +import java.util.logging.Logger; > + > +import org.apache.oodt.cas.resource.jobqueue.JobQueue; > +import org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory; > +import org.apache.oodt.cas.resource.queuerepo.XmlQueueRepositoryFactory; > +import org.apache.oodt.cas.resource.scheduler.QueueManager; > +import org.apache.oodt.cas.resource.scheduler.Scheduler; > +import org.apache.oodt.cas.resource.scheduler.SchedulerFactory; > +import > org.apache.oodt.cas.resource.structs.exceptions.RepositoryException; > +import > org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; > + > +/** > + * This class acts as a factory for the whole queue-mux > + * set of classes. > + * > + * @author starchmd > + */ > +public class QueueMuxSchedulerFactory implements SchedulerFactory { > + > + private static final Logger LOG = > Logger.getLogger(QueueMuxSchedulerFactory.class.getName()); > + > + BackendManager backend; > + QueueManager qManager; > + JobQueue jobQueue; > + /** > + * ctor > + */ > + public QueueMuxSchedulerFactory() { > + //Load backend manager > + String backRepo = > System.getProperty("resource.backend.mux.repository", > + XmlBackendRepository.class.getCanonicalName()); > + try { > + backend = > GenericResourceManagerObjectFactory.getBackendRepositoryFromFactory(backRepo).load(); > + } catch (RepositoryException e) { > + LOG.log(Level.SEVERE,"Error loading backend repository: > "+e.getMessage(),e); > + backend = null; > + } > + //Load user-specified queue factory > + String qFact = > System.getProperty("org.apache.oodt.cas.resource.queues.repo.factory", > + XmlQueueRepositoryFactory.class.getCanonicalName()); > + qManager = > GenericResourceManagerObjectFactory.getQueueRepositoryFromFactory( > + qFact).loadQueues(); > + //Load job queue > + String jobFact = System.getProperty("resource.jobqueue.factory", > + JobStackJobQueueFactory.class.getCanonicalName()); > + jobQueue = GenericResourceManagerObjectFactory > + .getJobQueueServiceFromFactory(jobFact); > + } > + > + @Override > + public Scheduler createScheduler() { > + return new QueueMuxScheduler(this.backend, this.qManager, > this.jobQueue); > + } > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/StandardBackendManager.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,120 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.oodt.cas.resource.mux; > + > +import java.util.HashMap; > +import java.util.LinkedList; > +import java.util.List; > +import java.util.Map; > + > +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; > +import org.apache.oodt.cas.resource.monitor.Monitor; > +import org.apache.oodt.cas.resource.scheduler.Scheduler; > +import > org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; > + > +/** > + * This manager keeps track of the mux-able backends for the resource > manager. > + * It effectively maps a queue to the backend that this queue feeds. > + * > + * It uses a private BackendSet to keep track of everything. > + * > + * For reference, a backend is a set of the following: > + * 1. Batch manger, responsible for running jobs > + * 2. Scheduler, responsible for scheduling a job to run > + * 3. Monitor, responsible for managing nodes > + * > + * @author starchmd > + */ > +public class StandardBackendManager implements BackendManager { > + Map<String,BackendSet> queueToBackend = new > HashMap<String,BackendSet>(); > + > + /** > + * Add in a backend set to this manager. > + * @param queue - queue that maps to the given monitor, batchmgr, and > scheduler > + * @param monitor - monitor used for this set > + * @param batchmgr - batch manager for this set > + * @param scheduler - scheduler for this set > + */ > + public void addSet(String queue,Monitor monitor, Batchmgr batchmgr, > Scheduler scheduler) { > + queueToBackend.put(queue, new > BackendSet(monitor,batchmgr,scheduler)); > + } > + /** > + * Return monitor for the given queue. > + * @param queue - queue to check > + * @return montior > + * @throws QueueManagerException when queue does not exist > + */ > + public Monitor getMonitor(String queue) throws QueueManagerException { > + BackendSet set = queueToBackend.get(queue); > + if (set == null) > + throw new QueueManagerException("Queue '" + queue + "' does > not exist"); > + return set.monitor; > + } > + /** > + * Return batch manager for the given queue. > + * @param queue - queue to check > + * @return batchmgr > + * @throws QueueManagerException when queue does not exist > + */ > + public Batchmgr getBatchmgr(String queue) throws > QueueManagerException { > + BackendSet set = queueToBackend.get(queue); > + if (set == null) > + throw new QueueManagerException("Queue '" + queue + "' does > not exist"); > + return set.batchmgr; > + } > + /** > + * Return scheduler for the given queue. > + * @param queue - queue to check > + * @return scheduler > + * @throws QueueManagerException when queue does not exist > + */ > + public Scheduler getScheduler(String queue) throws > QueueManagerException { > + BackendSet set = queueToBackend.get(queue); > + if (set == null) > + throw new QueueManagerException("Queue '" + queue + "' does > not exist"); > + return set.scheduler; > + } > + /** > + * Return a list of all monitors. > + * @return list of all monitors > + */ > + public List<Monitor> getMonitors() { > + List<Monitor> monitors = new LinkedList<Monitor>(); > + for (BackendSet set : queueToBackend.values()) { > + monitors.add(set.monitor); > + } > + return monitors; > + } > + /** > + * Class that holds a set of the three backend pieces. > + * Private class, because no accessor/modifiers have been > + * created(public members). Acts like a struct. > + * > + * @author starchmd > + */ > + private class BackendSet { > + public Monitor monitor = null; > + public Batchmgr batchmgr = null; > + public Scheduler scheduler = null; > + > + public BackendSet(Monitor monitor, Batchmgr batchmgr, Scheduler > scheduler) { > + this.monitor = monitor; > + this.batchmgr = batchmgr; > + this.scheduler = scheduler; > + } > + } > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepository.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,186 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.oodt.cas.resource.mux; > + > +import java.io.File; > +import java.io.FileInputStream; > +import java.io.FileNotFoundException; > +import java.net.URI; > +import java.net.URISyntaxException; > +import java.util.logging.Level; > +import java.util.logging.Logger; > + > +import org.apache.oodt.cas.resource.scheduler.Scheduler; > +import > org.apache.oodt.cas.resource.structs.exceptions.RepositoryException; > +import > org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; > +import org.apache.oodt.commons.xml.XMLUtils; > +import org.w3c.dom.Document; > +import org.w3c.dom.Element; > +import org.w3c.dom.NodeList; > + > +/** > + * Class to load BackendManager from XML file. > + * @author starchmd > + */ > +public class XmlBackendRepository implements BackendRepository { > + > + private static final Logger LOG = > Logger.getLogger(XmlBackendRepository.class.getName()); > + private String uri; > + > + //Constants > + private static final String SCHEDULER = "scheduler"; > + private static final String BATCHMGR = "batchmgr"; > + private static final String MONITOR = "monitor"; > + > + private static final String MONITOR_PROPERTY = > "resource.monitor.factory"; > + private static final String BATCHMGR_PROPERTY = > "resource.batchmgr.factory"; > + > + /** > + * Ctor > + * @param uri - uri of XML file containing mapping > + */ > + public XmlBackendRepository(String uri) { > + if (uri == null) > + throw new NullPointerException("URI for queue-to-backend xml > file cannot be null"); > + this.uri = uri; > + } > + /* (non-Javadoc) > + * @see org.apache.oodt.cas.resource.mux.BackendRepository#load() > + */ > + @Override > + public BackendManager load() throws RepositoryException { > + LOG.log(Level.INFO,"Reading backend set manager from: "+this.uri); > + BackendManager bm = new StandardBackendManager(); > + String origMon = System.getProperty(MONITOR_PROPERTY); > + String origBat = System.getProperty(BATCHMGR_PROPERTY); > + try { > + File file = new File(new URI(this.uri)); > + Document root = XMLUtils.getDocumentRoot(new > FileInputStream(file)); > + NodeList list = root.getElementsByTagName("queue"); > + if (list != null && list.getLength() > 0) { > + for (int k = 0; k < list.getLength(); k++) { > + Element node = (Element)list.item(k); > + String queue = node.getAttribute("name"); > + //Set properties for batch and monitor factories > + //So scheduler builds as repository specifies > + try { > + String mfact = getMonitor(queue,node); > + LOG.log(Level.INFO,"Setting monitor factory > property to: "+mfact); > + System.setProperty(MONITOR_PROPERTY, mfact); > + } catch (RepositoryException e) { > + LOG.log(Level.INFO, "No monitor factory for queue > "+queue+", using system property."); > + } > + try { > + String bfact = getBatchmgr(queue,node); > + LOG.log(Level.INFO,"Setting batchmgr factory > property to: "+bfact); > + System.setProperty(BATCHMGR_PROPERTY, bfact); > + } catch (RepositoryException e) { > + LOG.log(Level.INFO, "No batchmgr factory for > queue "+queue+", using system property."); > + } > + //Build scheduler > + Scheduler sch = getScheduler(queue,node); > + bm.addSet(queue, sch.getMonitor(), sch.getBatchmgr(), > sch); > + //Reset Properties for next item > + resetAlteredProperty(MONITOR_PROPERTY,origMon); > + resetAlteredProperty(BATCHMGR_PROPERTY,origBat); > + } > + } > + } catch (URISyntaxException e) { > + LOG.log(Level.SEVERE,"Malformed URI: "+this.uri); > + throw new RepositoryException(e); > + } catch(FileNotFoundException e) { > + LOG.log(Level.SEVERE,"File not found: "+this.uri+" from > working dir: "+new File(".").getAbsolutePath()); > + throw new RepositoryException(e); > + } catch (ClassCastException e) { > + LOG.log(Level.SEVERE,"Queue tag must represent XML element."); > + throw new RepositoryException(e); > + } finally { > + resetAlteredProperty(MONITOR_PROPERTY,origMon); > + resetAlteredProperty(BATCHMGR_PROPERTY,origBat); > + } > + > + return bm; > + } > + /** > + * Resets a property. Allows nulls > + * @param prop - property name to reset > + * @param value - value to reset to, can be null > + */ > + private static void resetAlteredProperty(String prop,String value) { > + if (value == null) { > + System.clearProperty(prop); > + return; > + } > + System.setProperty(prop,value); > + } > + > + /** > + * Get monitor factory from XML > + * @param queue - current queue, for error reporting > + * @param node - node that is being read > + * @return monitor factory string > + * @throws RepositoryException > + */ > + private static String getMonitor(String queue,Element node) throws > RepositoryException { > + return getFactoryAttribute(queue, node, MONITOR); > + } > + /** > + * Get scheduler from XML > + * @param queue - current queue, for error reporting > + * @param node - node that is being read > + * @return newly constructed Scheduler > + * @throws RepositoryException > + */ > + private static Scheduler getScheduler(String queue,Element node) > throws RepositoryException { > + String factory = getFactoryAttribute(queue, node, SCHEDULER); > + LOG.log(Level.INFO,"Loading monitor from: "+factory); > + Scheduler sch = > GenericResourceManagerObjectFactory.getSchedulerServiceFromFactory(factory); > + if (sch != null) > + return sch; > + throw new RepositoryException("Could instantiate from: "+factory); > + } > + /** > + * Get batchmgr factory from XML > + * @param queue - current queue, for error reporting > + * @param node - node that is being read > + * @return batch manager factory name > + * @throws RepositoryException > + */ > + private static String getBatchmgr(String queue,Element node) throws > RepositoryException { > + return getFactoryAttribute(queue, node, BATCHMGR); > + } > + /** > + * Pull out the factory attribute from tag with given name. > + * @param queue - current queue, for error reporting > + * @param elem - element that contains tags as children > + * @param tag - string name of tag looked for. i.e. "monitor" > + * @return name of factory class > + * @throws RepositoryException - thrown if more than one child > matches, no children match, or other error > + */ > + private static String getFactoryAttribute(String queue,Element elem, > String tag) throws RepositoryException { > + NodeList children = elem.getElementsByTagName(tag); > + try { > + String attr = ""; > + if (children.getLength() != 1 || (attr = > ((Element)children.item(0)).getAttribute("factory")) == "") { > + throw new RepositoryException("Could not find exactly one > "+tag+", with factory set, in queue: "+queue); > + } > + return attr; > + } catch (ClassCastException e) { > + throw new RepositoryException("Tag "+tag+" does not represent > XML element in queue: "+queue,e); > + } > + } > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,55 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.oodt.cas.resource.mux; > + > +//OODT imports > +import org.apache.oodt.cas.metadata.util.PathUtils; > + > +//JDK imports > +import java.util.logging.Level; > +import java.util.logging.Logger; > + > +/** > + * > + * @author starchmd > + * @version $Revision$ > + * > + * <p> > + * The XML Backend Repository Factory interface. > + * </p> > + */ > +public class XmlBackendRepositoryFactory implements > BackendRepositoryFactory { > + > + private static final Logger LOG = > Logger.getLogger(XmlBackendRepositoryFactory.class.getName()); > + /** > + * Create the backend repository (xml) > + * @return the newly minted backend repository > + */ > + public XmlBackendRepository createBackendRepository() { > + try { > + String uri = > System.getProperty("resource.backend.mux.xmlrepository.queuetobackend"); > + /* do env var replacement */ > + uri = PathUtils.replaceEnvVariables(uri); > + return new XmlBackendRepository(uri); > + } catch (NullPointerException e) { > + LOG.log( Level.SEVERE,"Failed to create > XmlBackendRepository: "+ e.getMessage(), e); > + return null; > + } > + } > + > +} > > Added: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java > (added) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/exceptions/RepositoryException.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,60 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.oodt.cas.resource.structs.exceptions; > + > +/** > + * @author starchmd > + * @version $Revision$ > + * > + * <p> > + * An exception thrown by the {@link BackendRepository} when an error > occurs. > + * </p> > + */ > +public class RepositoryException extends Exception { > + > + /* serial version UID */ > + private static final long serialVersionUID = 4568261126290589269L; > + > + /** > + * > + */ > + public RepositoryException() {} > + > + /** > + * @param message > + */ > + public RepositoryException(String message) { > + super(message); > + } > + > + /** > + * @param cause > + */ > + public RepositoryException(Throwable cause) { > + super(cause); > + } > + > + /** > + * @param message > + * @param cause > + */ > + public RepositoryException(String message, Throwable cause) { > + super(message, cause); > + } > + > +} > > Modified: > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java?rev=1634142&r1=1634141&r2=1634142&view=diff > > ============================================================================== > --- > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java > (original) > +++ > oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java > Fri Oct 24 21:38:01 2014 > @@ -33,6 +33,8 @@ import org.apache.oodt.cas.resource.moni > import org.apache.oodt.cas.resource.monitor.MonitorFactory; > import > org.apache.oodt.cas.resource.monitor.ganglia.loadcalc.LoadCalculator; > import > org.apache.oodt.cas.resource.monitor.ganglia.loadcalc.LoadCalculatorFactory; > +import org.apache.oodt.cas.resource.mux.BackendRepository; > +import org.apache.oodt.cas.resource.mux.BackendRepositoryFactory; > import org.apache.oodt.cas.resource.noderepo.NodeRepository; > import org.apache.oodt.cas.resource.noderepo.NodeRepositoryFactory; > import org.apache.oodt.cas.resource.queuerepo.QueueRepository; > @@ -162,7 +164,42 @@ public final class GenericResourceManage > > return null; > } > + /** > + * Creates a new {@link BackendRepository} implementation from the given > + * {@link BackendRepositoryFactory} class name. > + * > + * @param backendRepositoryFactory > + * The class name of the {@link BackendRepositoryFactory} to > use to create new > + * {@link BackendRepository}s. > + * @return A new implementation of a {@link BackendRepository}. > + */ > + public static BackendRepository getBackendRepositoryFromFactory(String > backendRepositoryFactory) { > + Class clazz = null; > + BackendRepositoryFactory factory = null; > + > + try { > + clazz = Class.forName(backendRepositoryFactory); > + factory = (BackendRepositoryFactory) clazz.newInstance(); > + return factory.createBackendRepository(); > + } catch (ClassNotFoundException e) { > + e.printStackTrace(); > + LOG.log(Level.WARNING, > + "ClassNotFoundException when loading backend repository factory > class " > + + backendRepositoryFactory + " Message: " + e.getMessage()); > + } catch (InstantiationException e) { > + e.printStackTrace(); > + LOG.log(Level.WARNING, > + "InstantiationException when loading backend repository factory > class " > + + backendRepositoryFactory + " Message: " + e.getMessage()); > + } catch (IllegalAccessException e) { > + e.printStackTrace(); > + LOG.log(Level.WARNING, > + "IllegalAccessException when loading backend repository factory > class " > + + backendRepositoryFactory + " Message: " + e.getMessage()); > + } > > + return null; > + } > /** > * Creates a new {@link NodeRepository} implementation from the given > * {@link QueueRepositoryFactory} class name. > > Added: > oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml > (added) > +++ > oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping.xml > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,24 @@ > +<?xml version='1.0' encoding='UTF-8'?> > +<!-- > +Licensed to the Apache Software Foundation (ASF) under one or more > contributor > +license agreements. See the NOTICE.txt file distributed with this work > for > +additional information regarding copyright ownership. The ASF licenses > this > +file to you under the Apache License, Version 2.0 (the "License"); you > may not > +use this file except in compliance with the License. You may obtain a > copy of > +the License at > + > + http://www.apache.org/licenses/LICENSE-2.0 > + > +Unless required by applicable law or agreed to in writing, software > +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT > +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the > +License for the specific language governing permissions and limitations > under > +the License. > +--> > +<cas:queue-to-backend-mapping xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas > "> > + <queue name="example"> > + <scheduler > factory="org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory"/> > + <monitor > factory="org.apache.oodt.cas.resource.monitor.AssignmentMonitorFactory"/> > + <batchmgr > factory="org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchMgrFactory"/> > + </queue> > +</cas:queue-to-backend-mapping> > > Modified: oodt/trunk/resource/src/main/resources/resource.properties > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/resource.properties?rev=1634142&r1=1634141&r2=1634142&view=diff > > ============================================================================== > --- oodt/trunk/resource/src/main/resources/resource.properties (original) > +++ oodt/trunk/resource/src/main/resources/resource.properties Fri Oct 24 > 21:38:01 2014 > @@ -31,6 +31,11 @@ resource.jobqueue.factory = org.apache.o > # resource job repository factory > resource.jobrepo.factory = > org.apache.oodt.cas.resource.jobrepo.MemoryJobRepositoryFactory > > +# For queue-multiplexing scheduler > +resource.backend.mux.repository = > org.apache.oodt.cas.resource.mux.XmlBackendRepositoryFactory > +resource.backend.mux.xmlrepository.queuetobackend = > file://[HOME]/queue-to-backend.xml > + > + > # node repository factory > org.apache.oodt.cas.resource.nodes.repo.factory = > org.apache.oodt.cas.resource.noderepo.XmlNodeRepositoryFactory > > > Added: > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java > (added) > +++ > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxBatchmgr.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,129 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > + > +package org.apache.oodt.cas.resource.mux; > + > +//OODT imports > +import org.apache.oodt.cas.resource.mux.mocks.MockBatchManager; > +import org.apache.oodt.cas.resource.structs.Job; > +import org.apache.oodt.cas.resource.structs.JobSpec; > +import org.apache.oodt.cas.resource.structs.ResourceNode; > +import > org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; > + > +//JUnit imports > +import junit.framework.TestCase; > + > +/** > + * @author starchmd > + * @version $Revision$ > + * > + * <p> > + * Test Suite for the {@link QueueBatchMonitor} service > + * </p>. > + */ > +public class TestQueueMuxBatchmgr extends TestCase { > + > + private QueueMuxBatchManager queue; > + private MockBatchManager mock1; > + private MockBatchManager mock2; > + > + protected void setUp() { > + BackendManager back = new StandardBackendManager(); > + back.addSet("queue-1", null,(mock1 = new MockBatchManager()), > null); > + back.addSet("queue-2", null,(mock2 = new MockBatchManager()), > null); > + queue = new QueueMuxBatchManager(back); > + } > + > + public void testExecuteRemotely() { > + try { > + > + //Test that the jobs are put in seperate mock-backends based > on queues > + ResourceNode node1 = new ResourceNode(); > + ResourceNode node2 = new ResourceNode(); > + > + JobSpec spec1 = this.getSpecFromQueue("queue-1"); > + queue.executeRemotely(spec1, node1); > + > + JobSpec spec2 = this.getSpecFromQueue("queue-2"); > + queue.executeRemotely(spec2, node2); > + //Yes...use reference equality, as these must be the exact > same object > + TestCase.assertEquals(spec1,mock1.getCurrentJobSpec()); > + TestCase.assertEquals(spec2,mock2.getCurrentJobSpec()); > + TestCase.assertEquals(node1,mock1.getCurrentResourceNode()); > + TestCase.assertEquals(node2,mock2.getCurrentResourceNode()); > + //Throws exception on bad queue > + try { > + > queue.executeRemotely(this.getSpecFromQueue("queue-3"),node1); > + TestCase.fail("Failed to throw JobExecutionException on > unknown queue."); > + } catch(JobExecutionException e) {} > + } catch (JobExecutionException e) { > + TestCase.fail("Unexpected Exception: "+e.getMessage()); > + } > + } > + > + public void testKillJob() { > + try { > + ResourceNode node1 = new ResourceNode(); > + ResourceNode node2 = new ResourceNode(); > + > + JobSpec spec1 = this.getSpecFromQueue("queue-1"); > + queue.executeRemotely(spec1, node1); > + > + JobSpec spec2 = this.getSpecFromQueue("queue-2"); > + queue.executeRemotely(spec2, node2); > + //Make sure that one can kill a job, and the other job is > running > + TestCase.assertTrue(queue.killJob(spec1.getJob().getId(), > node1)); > + TestCase.assertEquals(mock1.getCurrentJobSpec(),null); > + TestCase.assertEquals(mock2.getCurrentJobSpec(),spec2); > + //Make sure kill fails with bad queue > + > TestCase.assertFalse(queue.killJob(this.getSpecFromQueue("queue-3").getJob().getId(), > node1)); > + } catch (JobExecutionException e) { > + TestCase.fail("Unexpected Exception: "+e.getMessage()); > + } > + } > + > + public void testGetExecNode() { > + try { > + ResourceNode node1 = new ResourceNode(); > + ResourceNode node2 = new ResourceNode(); > + node1.setId("Node1-ID"); > + node2.setId("Node2-ID"); > + JobSpec spec1 = this.getSpecFromQueue("queue-1"); > + queue.executeRemotely(spec1, node1); > + > + JobSpec spec2 = this.getSpecFromQueue("queue-2"); > + queue.executeRemotely(spec2, node2); > + //Make that the execution node is same > + > TestCase.assertEquals(node1.getNodeId(),queue.getExecutionNode(spec1.getJob().getId())); > + > TestCase.assertEquals(node2.getNodeId(),queue.getExecutionNode(spec2.getJob().getId())); > + //Returns null, if bad-queue > + > TestCase.assertNull(queue.getExecutionNode(this.getSpecFromQueue("queue-3").getJob().getId())); > + } catch (JobExecutionException e) { > + TestCase.fail("Unexpected Exception: "+e.getMessage()); > + } > + } > + > + private JobSpec getSpecFromQueue(String queue) { > + JobSpec spec1 = new JobSpec(); > + Job job1 = new Job(); > + job1.setId("000000100000011-"+queue); > + job1.setQueueName(queue); > + spec1.setJob(job1); > + return spec1; > + } > +} > > Added: > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java > (added) > +++ > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMuxMonitor.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,204 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.oodt.cas.resource.mux; > + > +//OODT imports > +import java.net.MalformedURLException; > +import java.net.URL; > +import java.util.HashMap; > +import java.util.LinkedList; > +import java.util.List; > +import java.util.Map; > + > +import org.apache.oodt.cas.resource.mux.mocks.MockMonitor; > +import org.apache.oodt.cas.resource.scheduler.QueueManager; > +import org.apache.oodt.cas.resource.structs.ResourceNode; > +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; > + > +import > org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; > + > +//JUnit imports > +import junit.framework.TestCase; > + > +/** > + * @author starchmd > + * @version $Revision$ > + * > + * <p> > + * Test Suite for the {@link QueueBatchMonitor} service > + * </p>. > + */ > +public class TestQueueMuxMonitor extends TestCase { > + > + private QueueMuxMonitor monitor; > + private MockMonitor mock1; > + private MockMonitor mock2; > + private ResourceNode superfluous; > + private QueueManager qm; > + Map<MockMonitor,List<ResourceNode>> map; > + > + protected void setUp() { > + try { > + //Map monitor to nodes list > + map = new HashMap<MockMonitor,List<ResourceNode>>(); > + List<ResourceNode> nodes1 = getNodesList("mock-1"); > + List<ResourceNode> nodes2 = getNodesList("mock-2"); > + //Backend Manager setup > + BackendManager back = new StandardBackendManager(); > + back.addSet("queue-1",(mock1 = addMonitor(0,map,nodes1)), > null, null); > + back.addSet("queue-2",(mock2 = addMonitor(5,map,nodes2)), > null, null); > + //Make sure the queue manager is setup > + qm = new QueueManager(); > + qm.addQueue("queue-1"); > + qm.addQueue("queue-2"); > + qm.addQueue("queue-3"); > + for (ResourceNode rn : nodes1) > + qm.addNodeToQueue(rn.getNodeId(), "queue-1"); > + for (ResourceNode rn : nodes2) > + qm.addNodeToQueue(rn.getNodeId(), "queue-2"); > + //Add an extra node to test "unknown queue" > + qm.addNodeToQueue((superfluous = new > ResourceNode("superfluous-1",new URL("http://superfluous-1"),-2)).getNodeId(), > "queue-3"); > + monitor = new QueueMuxMonitor(back, qm); > + } catch (QueueManagerException e) { > + TestCase.fail("Unanticipated queue manager exception caught: > "+e.getMessage()); > + } catch (MalformedURLException e) { > + TestCase.fail("Unanticipated URL exception caught: > "+e.getMessage()); > + } > + } > + > + public void testGetLoad() { > + try { > + > TestCase.assertEquals(mock1.load,monitor.getLoad(map.get(mock1).get(0))); > + > TestCase.assertEquals(mock2.load,monitor.getLoad(map.get(mock2).get(0))); > + > + /*try { > + monitor.getLoad(superfluous); > + TestCase.fail("Exception not thrown for unknown queue."); > + } catch (MonitorException e) { > + }*/ > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } > + } > + > + public void testGetNodes() { > + try { > + List<ResourceNode> nodes = monitor.getNodes(); > + for (ResourceNode rn :map.get(mock1)) > + TestCase.assertTrue("Node: "+rn.getNodeId()+ " not > found.", nodes.contains(rn)); > + for (ResourceNode rn :map.get(mock2)) > + TestCase.assertTrue("Node: "+rn.getNodeId()+ " not > found.", nodes.contains(rn)); > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } > + } > + > + public void testGetNodeById() { > + try { > + > TestCase.assertEquals(map.get(mock1).get(0),monitor.getNodeById("mock-1-1")); > + > TestCase.assertEquals(map.get(mock2).get(0),monitor.getNodeById("mock-2-1")); > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } > + } > + public void testGetNodeByURL() { > + try { > + > TestCase.assertEquals(map.get(mock1).get(1),monitor.getNodeByURL(new URL(" > http://mock-1-2"))); > + > TestCase.assertEquals(map.get(mock2).get(1),monitor.getNodeByURL(new URL(" > http://mock-2-2"))); > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } catch (MalformedURLException e1) { > + TestCase.fail("Unanticipated URL exception caught: > "+e1.getMessage()); > + } > + } > + > + public void testReduceLoad() { > + try { > + TestCase.assertTrue(monitor.reduceLoad(map.get(mock1).get(2), > 5)); > + TestCase.assertTrue(monitor.reduceLoad(map.get(mock2).get(2), > 3)); > + TestCase.assertEquals(map.get(mock1).get(2).getCapacity(),25); > + TestCase.assertEquals(map.get(mock2).get(2).getCapacity(),27); > + try { > + monitor.reduceLoad(superfluous, 2); > + TestCase.fail("Exception not thrown for unknown queue."); > + } catch (MonitorException e) {} > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } > + } > + > + public void testAssignLoad() { > + try { > + TestCase.assertTrue(monitor.assignLoad(map.get(mock1).get(2), > 5)); > + TestCase.assertTrue(monitor.assignLoad(map.get(mock2).get(2), > 3)); > + TestCase.assertEquals(map.get(mock1).get(2).getCapacity(),5); > + TestCase.assertEquals(map.get(mock2).get(2).getCapacity(),3); > + try { > + monitor.assignLoad(superfluous, 2); > + TestCase.fail("Exception not thrown for unknown queue."); > + } catch (MonitorException e) {} > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } > + } > + > + public void testAddNode() { > + try { > + ResourceNode node = new ResourceNode("a-new-node",null,2); > + qm.addNodeToQueue(node.getNodeId(), "queue-1"); > + monitor.addNode(node); > + TestCase.assertEquals(node,mock1.getAdded()); > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } catch (QueueManagerException e1) { > + TestCase.fail("Unanticipated queue manager exception caught: > "+e1.getMessage()); > + } > + } > + public void removeNodeById() { > + try { > + ResourceNode node = new ResourceNode("a-new-node",null,2); > + qm.addNodeToQueue(node.getNodeId(), "queue-1"); > + monitor.addNode(node); > + TestCase.assertEquals(node,mock1.getAdded()); > + monitor.removeNodeById(node.getNodeId()); > + TestCase.assertEquals(null,mock1.getAdded()); > + } catch(MonitorException e) { > + TestCase.fail("Unanticipated monitor exception caught: > "+e.getMessage()); > + } catch (QueueManagerException e1) { > + TestCase.fail("Unanticipated queue manager exception caught: > "+e1.getMessage()); > + } > + } > + > + private MockMonitor addMonitor(int load,Map<MockMonitor, > List<ResourceNode>> map, List<ResourceNode> list) { > + MockMonitor mon = new MockMonitor(load, list, list.get(0), > list.get(1), list.get(2)); > + map.put(mon, list); > + return mon; > + } > + private List<ResourceNode> getNodesList(String prefix) { > + List<ResourceNode> nodes = new LinkedList<ResourceNode>(); > + try { > + nodes.add(new ResourceNode(prefix+"-1",new URL("http:// > "+prefix+"-1"),10)); > + nodes.add(new ResourceNode(prefix+"-2",new URL("http:// > "+prefix+"-2"),20)); > + nodes.add(new ResourceNode(prefix+"-3",new URL("http:// > "+prefix+"-3"),30)); > + nodes.add(new ResourceNode(prefix+"-4",new URL("http:// > "+prefix+"-4"),40)); > + } catch (MalformedURLException e) { > + TestCase.fail("Unanticipated URL exception caught: > "+e.getMessage()); > + } > + return nodes; > + } > +} > > Added: > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java > (added) > +++ > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockBatchManager.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,82 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.oodt.cas.resource.mux.mocks; > + > +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; > +import org.apache.oodt.cas.resource.jobrepo.JobRepository; > +import org.apache.oodt.cas.resource.monitor.Monitor; > +import org.apache.oodt.cas.resource.structs.JobSpec; > +import org.apache.oodt.cas.resource.structs.ResourceNode; > +import > org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; > +/** > + * This is a mock version of the batch manager. It SHOULD NOT, and > + * CAN NOT be used as a normal class. > + * > + * @author starchmd > + */ > +public class MockBatchManager implements Batchmgr { > + > + private JobSpec execJobSpec; > + private ResourceNode execResNode; > + > + @Override > + public boolean executeRemotely(JobSpec job, ResourceNode resNode) > + throws JobExecutionException { > + this.execJobSpec = job; > + this.execResNode = resNode; > + return true; > + } > + > + @Override > + public void setMonitor(Monitor monitor) {} > + > + @Override > + public void setJobRepository(JobRepository repository) {} > + > + @Override > + public boolean killJob(String jobId, ResourceNode node) { > + if (this.execJobSpec.getJob().getId().equals(jobId)) > + { > + this.execJobSpec = null; > + this.execResNode = null; > + return true; > + } > + return false; > + } > + > + @Override > + public String getExecutionNode(String jobId) { > + return execResNode.getNodeId(); > + } > + /***** > + * The following are test methods to report what jobs are here. > + *****/ > + /** > + * Return the current jobspec, for testing purposes > + * @return > + */ > + public JobSpec getCurrentJobSpec() { > + return this.execJobSpec; > + } > + /** > + * Return the current resource node, for testing purposes > + * @return > + */ > + public ResourceNode getCurrentResourceNode() { > + return execResNode; > + } > +} > > Added: > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java > URL: > http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java?rev=1634142&view=auto > > ============================================================================== > --- > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java > (added) > +++ > oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockMonitor.java > Fri Oct 24 21:38:01 2014 > @@ -0,0 +1,91 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.oodt.cas.resource.mux.mocks; > + > +import java.net.URL; > +import java.util.List; > + > +import org.apache.oodt.cas.resource.monitor.Monitor; > +import org.apache.oodt.cas.resource.structs.ResourceNode; > +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; > + > +public class MockMonitor implements Monitor { > + > + public int load = -1; > + List<ResourceNode> nodes; > + ResourceNode id; > + ResourceNode url; > + ResourceNode add; > + ResourceNode reduce; > + > + public MockMonitor(int load,List<ResourceNode> nodes, ResourceNode > id, ResourceNode url, ResourceNode reduce) { > + this.load = load; > + this.nodes = nodes; > + this.id = id; > + this.url = url; > + this.reduce = reduce; > + } > + > + @Override > + public int getLoad(ResourceNode node) throws MonitorException { > + return load; > + } > + @Override > + public List getNodes() throws MonitorException { > + return nodes; > + } > + > + @Override > + public ResourceNode getNodeById(String nodeId) throws > MonitorException { > + return id.getNodeId().equals(nodeId)?id:null; > + } > + > + @Override > + public ResourceNode getNodeByURL(URL ipAddr) throws MonitorException { > + return url.getIpAddr().equals(ipAddr)?url:null; > + } > + > + @Override > + public boolean reduceLoad(ResourceNode node, int loadValue) > + throws MonitorException { > + reduce.setCapacity(reduce.getCapacity() - loadValue); > + return true; > + } > + > + @Override > + public boolean assignLoad(ResourceNode node, int loadValue) > + throws MonitorException { > + reduce.setCapacity(loadValue); > + return true; > + } > + > + @Override > + public void addNode(ResourceNode node) throws MonitorException { > + this.add = node; > + > + } > + > + @Override > + public void removeNodeById(String nodeId) throws MonitorException { > + if (this.add.getNodeId().equals(nodeId)) > + this.add = null; > + } > + > + public ResourceNode getAdded() { > + return this.add; > + } > +} > > > > > ... > > [Message clipped] -- *Lewis*
