+BEWM ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Chris Mattmann, Ph.D. Chief Architect Instrument Software and Science Data Systems Section (398) NASA Jet Propulsion Laboratory Pasadena, CA 91109 USA Office: 168-519, Mailstop: 168-527 Email: [email protected] WWW: http://sunset.usc.edu/~mattmann/ ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Adjunct Associate Professor, Computer Science Department University of Southern California, Los Angeles, CA 90089 USA ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-----Original Message----- From: Lewis John Mcgibbney <[email protected]> Reply-To: "[email protected]" <[email protected]> Date: Friday, October 24, 2014 at 2:51 PM To: "[email protected]" <[email protected]> Subject: Re: commits Digest 24 Oct 2014 21:39:27 -0000 Issue 947 >BEWM > >On Fri, Oct 24, 2014 at 2:39 PM, <[email protected]> >wrote: > >> >> commits Digest 24 Oct 2014 21:39:27 -0000 Issue 947 >> >> Topics (messages 6955 through 6955) >> >> svn commit: r1634142 - in /oodt/trunk/resource/src: >> main/java/org/apache/oodt/cas/resource/mux/ >> main/java/org/apache/oodt/cas/resource/structs/exceptions/ >> main/java/org/apache/oodt/cas/resource/util/ main/resources/ >> main/resources/examples/ tes/org/ap... >> 6955 by: starchmd.apache.org >> >> Administrivia: >> >> --------------------------------------------------------------------- >> To pos to the list, e-mail: [email protected] >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> ---------------------------------------------------------------------- >> >> >> >> ---------- Forwarded message ---------- >> From: [email protected] >> To: [email protected] >> Cc: >> Date: Fri, 24 Oct 2014 21:38:01 -0000 >> Subject: svn commit: r1634142 - in /oodt/trunk/resource/src: >> main/java/org/apache/oodt/cas/resource/mux/ >> main/java/org/apache/oodt/cas/resource/structs/exceptions/ >> main/java/org/apache/oodt/cas/resource/util/ main/resources/ >> main/resources/examples/ test/org/ap... >> Author: starchmd >> Date: Fri Oct 24 21:38:01 2014 >> New Revision: 1634142 >> >> URL: http://svn.apache.org/r1634142 >> Log: >> Submitting multiplexing backend for resource manager >> >> Added: >> oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/ >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dManager.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepository.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepositoryFactory.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxBatchManager.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxMonitor.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxScheduler.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxSchedulerFactory.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Standa >>rdBackendManager.java >> >> >>oodt/trunk/resource/src/main/java/rg/apache/oodt/cas/resource/mux/XmlBac >>kendRepository.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBac >>kendRepositoryFactory.java >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/ex >>ceptions/RepositoryException.java >> >> >>oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping. >>xml >> oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/ >> >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xBatchmgr.java >> >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xMonitor.java >> oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/ >> >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockB >>atchManager.java >> >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockM >>onitor.java >> Modified: >> >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/Gener >>icResourceManagerObjectFactory.java >> oodt/trunk/resource/src/main/resources/resource.properties >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dManager.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/BackendManager.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dManager.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dManager.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,67 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.oodt.cas.resource.mux; >> + >> +import java.util.List; >> + >> +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; >> +import org.apache.oodt.cas.resource.monitor.Monitor; >> +import org.apache.oodt.cas.resource.scheduler.Scheduler; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; >> + >> +/** >> + * Interface for the backend manager >> + * >> + * @author starchmd >> + */ >> +public interface BackendManager { >> + >> + /** >> + * Add in a backend set to this manager. >> + * @param queue - queue that maps to the given monitor, batchmgr, >>and >> scheduler >> + * @param monitor - monitor used for this set >> + * @param batchmgr - batch manager for this set >> + * @param scheduler - scheduler for this set >> + */ >> + public void addSet(String queue,Monitor monitor, Batchmgr batchmgr, >> Scheduler scheduler); >> + /** >> + * Return monitor for the given queue. >> + * @param queue - queue to check >> + * @return montior >> + * @throws QueueManagerException when queue does not exist >> + */ >> + public Monitor getMonitor(String queue) throws >>QueueManagerException; >> + /** >> + * Return batch manager for the given queue. >> + * @param queue - queue to check >> + * @return batchmgr >> + * @throws QueueManagerException when queue does not exist >> + */ >> + public Batchmgr getBatchmgr(String queue) throws >> QueueManagerException; >> + /** >> + * Return scheduler for the given queue. >> + * @param queue - queue to check >> + * @return scheduler >> + * @throws QueueManagerException when queue does not exist >> + */ >> + public Scheduler getScheduler(String queue) throws >> QueueManaerException; >> + /** >> + * Return a list of all monitors. >> + * @return list of all monitors >> + */ >> + public List<Monitor> getMonitos(); >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepository.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/BackendRepository.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepository.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepository.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,33 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.oodt.cas.resource.mux; >> + >> +import >> org.apache.oodt.cas.resource.structs.exceptions.RepositoryException; >> + >> +/** >> + * Interface to handle loading of the configuration for which queues >>are >> associated >> + * with which backend. i.e. read BackendManager configuration. >> + * >> + * @author starchmd >> + */ >> +public interface BackendRepository { >> + /** >> + * Load the backend. >> + * @return BackendManager all set up and ready to go. >> + */ >> + public BackendManager load() throws RepositoryException; >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepositoryFactory.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/BackendRepositoryFactory.java?rev=1634142&view=aut >>o >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepositoryFactory.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Backen >>dRepositoryFactory.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,36 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE fil distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in cmpliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +package org.apache.oodt.cas.resource.mux; >> + >> +/** >> + * >> * @author starchmd >> + * @version $Revision$ >> + * >> + * <p> >> + * The Backend Manager Repository Factory interface. >> + * </p> >> + */ >> +public interface BackendRepositoryFactory { >> + >> + /** >> + * Create a backend repository >> + * @return the newly minted backend repository >> + */ >> + public BackendRepository createBackendRepository(); >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxBatchManager.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/QueueMuxBatchManager.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxBatchManager.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxBatchManager.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,129 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may notuse this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * > + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +package org.apache.oodt.cas.resource.mux; >> + >> +import java.util.HashMap; >> +import java.util.Map; >> +import java.util.logging.Level; >> +import java.util.logging.Logger; >> + >> +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; >> +import org.apache.oodt.cas.resource.jobrepo.JobRepository; >> +import org.apache.oodt.cas.resource.monitor.Monitor; >> +import org.apache.oodt.cas.resource.structs.JobSpec; >> +import org.apache.oodt.cas.resource.strcts.ResourceNode; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; >> + >> +/** >> + * @author starchmd >> + * @version $Revision$ >> + * >> + * A batch-manager used to execute and control jobs in a mesos-cluster. >> + */ >> +public class QueueMuxBatchManager implements Batchmgr { >> + >> + private Logger LOG = >> Logger.getLogger(QueueMuxBatchManager.class.getName()); >> + >> + BackendManager backend; >> + Map<String,String> jobIdToQueue = new HashMap<String,String>(); >> + JobRepository repo; >> + >> + /** >> + * ctor >> + * @param bm - backend manager >> + */ >> + public QueueMuxBatchManager(BackendManagerbm) { >> + setBackendManager(bm); >> + } >> + /** >> + * Set the backend manager. >> + * @param backend - backend manager effectively mapping queue's to >> sets of backends. >> + */ >> + public void setBackendManager(BackendManager backend) { >> + this.backend = backend; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.batchmgr.Batchmgr#executeRemotely(org.apache >>.oodt.cas.resource.structs.JobSpec, >> org.apache.oodt.cas.resource.structs.ResourceNode) >> + */ >> + @Override >> + public boolean executeRemotely(JobSpec job, ResourceNode resNode) >> + throws JobExecutionException { >> + try { >> + >> jobIdToQueue.put(job.getJob().getId(),job.getJob().getQueueame()); >> + return >> getManagerByQueue(job.getJob().getQueueName()).executeRemotely(job, >> resNode); >> + } catch (QueueManagerException e) { >> + jobIdToQueue.remove(job.getJob().getQueueName()); >> + LOG.log(Level.WARNING, "Exception recieved while executing >> job: "+e.getLocalizedMessage()+". Job will not execute."); >> + throw new JobExecutionException(e); >> + } >> + } >> + >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.batchmgr.Batchmgr#setMonitor(org.apache.oodt >>.cas.resource.monitor.Monitor) >> + */ >> + @Override >> + public void setMonitor(Monitor monitor) { >> + throw new UnsupportedOperatioException("Cannot set the monitor >> when using the queue-mux batch manager."); >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.batchmgr.Batchmgr#setJobRepository(org.apach >>e.oodt.cas.resource.jobrepo.JobRepository) >> + */ >> + @Override >> + public void setJobRepository(JobRepository repository) { >> + this.repo = repository; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> org.apache.oodt.cas.resource.batchmgr.Batchmgr#killJob(java.lang.String, >> org.apache.oodt.cas.resource.structs.ResourceNode) >> + */ >> + @Override >> + public boolean killJob(String jobId, ResourceNode node) { >> + try { >> + return getManagerByJob(jobId).killJob(jobId,node); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.SEVERE, "Cannot kill job: >> "+e.getLocalizedMessage()); >> + } >> + return false; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.batchmgr.Batchmgr#getExecutionNode(java.lang >>.String) >> + */ >> + @Override >> + public String getExecutionNode(String jobId) { >> + try { >> + return getManagerByJob(jobId).getExecutionNode(jobId); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.SEVERE, "Cannot get exectuion node for job: >> "+e.getLocalizedMessage()); >> + } >> + return null; >> + } >> + >> + private Batchmgr getManagerByJob(String jobId) throws >> QueueManagerException { >> + return getManagerByQueue(jobIdToQueue.get(jobId)); >> + } >> + >> + private Batchmgr getManagerByQueue(String queue) throws >> QueueManagerException { >> + return this.backend.getBatchmgr(queue); >> + } >> + >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/casresource/mux/QueueM >>uxMonitor.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/QueueMuMonitor.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxMonitor.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxMonitor.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,207 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.oodt.cas.resource.mux; >> + >> +import java.net.URL; >> +import java.util.Iterator; >> +import java.util.LinkedHashSet; >> +import java.util.LinkedList; >> +import java.util.List; >> +import java.util.Set; >> +import java.util.logging.Level; >> +import java.util.logging.Logger; >> + >> +import org.apache.oodt.cas.resource.monitor.Monitor; >> +import org.apache.oodt.cas.resource.scheduler.QueueManager; >> +import org.apache.oodt.cas.resource.structs.ResourceNode; >> +import >>org.apache.oodt.cas.resource.structs.exceptions.MonitorException; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; >> + >> +/** >> + * @author starchmd >> + * @version $Revision$ >> + * >> + * A monitor to monitor the multiple monitors. >> + */ >> +public class QueueMuxMonitor implements Monitor { >> + private static final Logger LOG = >> Logger.getLogger(QueueMuxMonitor.class.getName()); >> + private BackendManager backend; >> + private QueueManager qManager; >> + /** >> + * ctor >> + * @param backend - backend manager >> + * @param qManager - queue manager >> + */ >> + public QueueMuxMonitor(BackendManager backend, QueueManager >>qManager) >> { >> + setBackendManager(backend,qManager); >> + } >> + /** >> + * Set the backend manager. >> + * @param backend - backend manager effectively mapping queue's to >> sets of backends. >> + */ >> + public void setBackendManager(BackendManager backend, QueueManager >> qManager) { >> + this.backend = backend; >> + this.qManager = qManager; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.monitor.Monitor#getLoad(org.apache.oodt.cas. >>resource.structs.ResourceNode) >> + */ >> + @Override >> + public int getLoad(ResourceNode node) throws MonitorException { >> + //Unclear what to do here. >> + //Assuming we should never be more than "Max" >> + List<String> queues = queuesForNode(node); >> + int max = 0; >> + for (String queue : queues) { >> + try { >> + max = >> Math.max(max,backend.getMonitor(queue).getLod(node)); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.WARNING,"Queue '"+queue+"' has >> dissappeared."); >> + } >> + } >> + return max; >> + } >> + >> + /* (non-Javadoc) >> + * @see org.apache.oodt.cas.resource.monitor.Monitor#getNodes() >> + */ >> + @Override >> + public List<ResourceNode> getNodes() throws MonitorException { >> + Set<ResourceNode> set = new LinkedHashSet<ResourceNode>(); >> + for (Monitor mon:this.backend.getMonitors()) { >> + for (Object res:mon.getNodes()) { >> + set.add((ResourceNode)res); >> + } >> + } >> + return new LinkedList<ResourceNode>(set); >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.monitor.Monitor#getNodeById(java.lang.String >>) >> + */ >> + @Override >> + public ResourceNode getNodeById(String nodeId) throws >> MonitorException { >> + ResourceNode node = null; >> + Iterator<Monitor> imon = this.backend.getMonitors().iterator(); >> + while(imon.hasNext() && (node = >>imon.next().getNodeById(nodeId)) >> == null) {} >> + return node; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> org.apache.oodt.cas.resource.monitor.Monitor#getNodeByURL(java.net.URL) >> + */ >> + @Override >> + public ResourceNode getNodeByURL(URL ipAddr) throws >>MonitorException { >> + ResourceNode node = null; >> + Iterator<Monitor> imon = this.backend.getMonitors().iterator(); >> + while(imon.hasNext() && (node = >>imon.next().getNodeByURL(ipAddr)) >> == null) {} >> + return node; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.monitor.Monitor#reduceLoad(org.apache.oodt.c >>as.resource.structs.ResourceNode, >> int) >> + */ >> + @Override >> + public boolean reduceLoad(ResourceNode node, int loadValue) >> + throws MonitorException { >> + List<String> queues = queusForNode(node); >> + boolean ret = true; >> + for (String queue:queues) { >> + try { >> + ret &= backend.getMonitor(queue).reduceLoad(node, >> loadValue); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.SEVERE,"Queue '"+queue+"' has >> dissappeared."); >> + throw new MonitorException(e); >> + } >> + } >> + return ret; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.monitor.Monitor#assignLoad(org.apache.oodt.c >>as.resource.structs.ResourceNode, >> int) >> + */ >> + @Override >> + public boolean assignLoad(ResourceNode node, int loadValue) >> + throws MonitorException { >> + List<String> queues = queuesForNode(node); >> + boolean ret = true; >> + for (String queue:queues) { >> + try { >> + ret &= backend.getMonitor(queue).assignLoad(node, >> loadValue); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.SEVERE,"Queue '"+queue+"' has >> dissappeared."); >> + throw new MonitorException(e); >> + } >> + } >> + return ret; >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.monitor.Monitor#addNode(org.apache.oodt.cas. >>resource.structs.ResourceNode) >> + */ >> + @Override >> + public void addNode(ResourceNode node) throws MonitorException { >> + List<String> queues = queuesForNode(node); >> + for (String queue:queues) { >> + try { >> + backend.getMonitor(queue).addNode(node); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.SEVERE,"Queue '"+queue+"' has >> dissappeared."); >> + throw new MonitorException(e); >> + } >> + } >> + } >> + >> + /* (non-Javadoc) >> + * @see >> >>org.apache.oodt.cas.resource.monitor.Monitor#removeNodeById(java.lang.Str >>ing) >> + */ >> + @Override >> + public void removeNodeById(String nodeId) throws MonitorException { >> + for (Monitor mon:this.backend.getMonitors()) { >> + mon.removeNodeById(nodeId); >> + } >> + } >> + /** >> + * Gets the queues that are associated with a particular node. >> + * @param node - node which queues are needed for >> + * @return list of queue names on that node >> + */ >> + private List<String> queuesForNode(ResourceNode node) { >> + List<String> ret = new LinkedList<String>(); >> + //Get list of queues >> + List<String> queues = null; >> + try >> + { >> + queues = qManager.getQueues(); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.SEVERE, "Cannot list queues."); >> + } >> + //Search each queu to see if it contains given node >> + for (String queue : queues) { >> + try >> + { >> + if >>(qManager.getNodes(queue).contains(node.getNodeId())) { >> + ret.add(queue); >> + } >> + } catch(QueueManagerException e) { >> + LOG.log(Level.SEVERE, "Queue '"+queue+"' has >> dissappeared."); >> + } >> + } >> + return ret; >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxScheduler.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/QueueMuxScheduler.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxScheduler.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxScheduler.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,181 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> + >> +package org.apache.oodt.cas.resource.mux; >> + >> +//JDKimports >> +import java.util.logging.Level; >> +import java.util.logging.Logger; >> + >> + >> + >> + >> + >> + >> +//OODT imports >> +import org.apache.oodt.cas.resource.jobqueue.JobQueue; >> +import org.apache.oodt.cas.resource.monitor.Monitor; >> +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; >> +import org.apache.oodt.cas.resource.scheduler.QueueManager; >> +import org.apache.oodt.cas.resource.scheduler.Scheduler; >> +import org.apache.oodt.cas.resource.structs.JobSpec; >> +import org.apache.oodt.cas.resource.structs.ResourceNode; >> +import >>org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; >> +import >>org.apache.oodt.cas.resource.structs.exceptions.SchedulerException; >> + >> +/** >> + * This scheduler multiplexes between multiple schedulers based on the >> "queue" . >> + * >> + * @author starchmd >> + * @version $Revision$ >> + */ >> +public class QueueMuxScheduler implements Scheduler { >> + >> + private static final Logger LOG = >> Logger.getLogger(QueueMuxScheduler.class.getName()); >> + >> + private BackendManager backend; >> + private JobQueue queue; >> + private float waitTime = -1; >> + >> + //Manages other queue-muxing components >> + private QueueMuxBatchManager batch; >> + private QueueMuxMonitor mon; >> + private QueueManager qManager; >> + >> + /** >> + * ctor >> + * @param backend - Backend manager to handle the many different >> backends. >> + */ >> + public QueueMuxScheduler(BackendManager backend, QueueManager qm, >> JobQueue jq) { >> + String waitStr = >> >>System.getProperty("org.apache.oodt.cas.resource.scheduler.wait.seconds", >> "20"); >> + waitTime = Float.parseFloat(waitStr); >> + this.queue = jq; >> + this.qManager = qm; >> + this.backend = backend; >> + //Required, so make them here >> + batch = new QueueMuxBatchManager(backend); >> + mon = new QueueMuxMonitor(backend,qm); >> + } >> + >> + /* >> + * (non-Javadoc) >> + * >> + * @see java.lang.Runnable#run() >> + */ >> + public void run() { >> + //Loop forever >> + while (true) { >> + try { >> + Thread.sleep((long) (waitTime * 1000.0)); >> + } catch (InterruptedException e) { >> + //If the thread will continue, reinterrupt thread >> + Thread.currentThread().interrupt(); >> + } >> + //You have jobs >> + if (!queue.isEmpty()) { >> + JobSpec job = null; >> + try { >> + job = queue.getNextJob(); >> + LOG.log(Level.INFO, "Scheduling job: ["+ >> job.getJob().getId()+ "] for execution"); >> + schedule(job); >> + } catch (JobQueueException je) { >> + LOG.log(Level.WARNING,"Error getting job from >>queue: " >> + + je.getLocalizedMessage()); >> + } catch (SchedulerException se) { >> + LOG.log(Level.WARNING,"Error occured scheduling >>job: >> "+se.getLocalizedMessage()); >> + try { >> + queue.requeueJob(job); >> + } catch (JobQueueException je) { >> + LOG.log(Level.WARNING,"Error requeueing job: >> "+je.getLocalizedMessage()); >> + LOG.log(Level.WARNING,"Previous error caused >>by: >> "+se.getLocalizedMessage()); >> + } >> + } >> + } >> + } >> + } >> + >> + /* >> + * (non-Javadoc) >> + * >> + * @see >> >>gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#schedule(gov.nasa.jpl. >>oodt.cas.resource.structs.JobSpec) >> + */ >> + public synchronized boolean schedule(JobSpec spec) >> + throws SchedulerException { >> + System.out.println("Spec: "+spec+" Job: "+spec.getJob()+" >> Backend:"+backend); >> + String queue = spec.getJob().getQueueName(); >> + try { >> + return backend.getScheduler(queue).schedule(spec); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.WARNING,"Exception occuered: >> "+e.getLocalizedMessage()); >> + throw new SchedulerException(e); >> + } >> + } >> + >> + /* >> + * (non-Javadoc) >> + * >> + * @see >> gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getBatchmgr() >> + */ >> + public Batchmgr getBatchmgr() { >> + return batch; >> + } >> + >> + /* >> + * (non-Javadoc) >> + * >> + * @see >> gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getMonitor() >> + */ >> + public Monitor getMonitor() { >> + return mon; >> + } >> + >> + /* >> + * (non-Javadoc) >> + * >> + * @see >> gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getJobQueue() >> + */ >> + public JobQueue getJobQueue() { >> + return this.queue; >> + } >> + >> + /* >> + * (non-Javadoc) >> + * >> + * @see >> gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#getQueueManager() >> + */ >> + public QueueManager getQueueManager() { >> + return qManager; >> + } >> + >> + /* >> + * (non-Javadoc) >> + * >> + * @see >> >>gov.nasa.jpl.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(gov.nasa >>.jpl.oodt.cas.resource.structs.JobSpec) >> + */ >> + public synchronized ResourceNode nodeAvailable(JobSpec spec) >> + throws SchedulerException { >> + String queue = spec.getJob().getQueueName(); >> + try { >> + return backend.getScheduler(queue).nodeAvailable(spec); >> + } catch (QueueManagerException e) { >> + LOG.log(Level.WARNING,"Exception occuered: >> "+e.getLocalizedMessage()); >> + throw new SchedulerException(e); >> + } >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxSchedulerFactory.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/QueueMuxSchedulerFactory.java?rev=1634142&view=aut >>o >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxSchedulerFactory.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/QueueM >>uxSchedulerFactory.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,74 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +package org.apache.oodt.cas.resource.mux; >> + >> +import java.util.logging.Level; >> +import java.util.logging.Logger; >> + >> +import org.apache.oodt.cas.resource.jobqueue.JobQueue; >> +import org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory; >> +import >>org.apache.oodt.cas.resource.queuerepo.XmlQueueRepositoryFactory; >> +import org.apache.oodt.cas.resource.scheduler.QueueManager; >> +import org.apache.oodt.cas.resource.scheduler.Scheduler; >> +import org.apache.oodt.cas.resource.scheduler.SchedulerFactory; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.RepositoryException; >> +import >> org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; >> + >> +/** >> + * This class acts as a factory for the whole queue-mux >> + * set of classes. >> + * >> + * @author starchmd >> + */ >> +public class QueueMuxSchedulerFactory implements SchedulerFactory { >> + >> + private static final Logger LOG = >> Logger.getLogger(QueueMuxSchedulerFactory.class.getName()); >> + >> + BackendManager backend; >> + QueueManager qManager; >> + JobQueue jobQueue; >> + /** >> + * ctor >> + */ >> + public QueueMuxSchedulerFactory() { >> + //Load backend manager >> + String backRepo = >> System.getProperty("resource.backend.mux.repository", >> + XmlBackendRepository.class.getCanonicalName()); >> + try { >> + backend = >> >>GenericResourceManagerObjectFactory.getBackendRepositoryFromFactory(backR >>epo).load(); >> + } catch (RepositoryException e) { >> + LOG.log(Level.SEVERE,"Error loading backend repository: >> "+e.getMessage(),e); >> + backend = null; >> + } >> + //Load user-specified queue factory >> + String qFact = >> System.getProperty("org.apache.oodt.cas.resource.queues.repo.factory", >> + XmlQueueRepositoryFactory.class.getCanonicalName()); >> + qManager = >> GenericResourceManagerObjectFactory.getQueueRepositoryFromFactory( >> + qFact).loadQueues(); >> + //Load job queue >> + String jobFact = >>System.getProperty("resource.jobqueue.factory", >> + >>JobStackJobQueueFactory.class.getCanonicalName()); >> + jobQueue = GenericResourceManagerObjectFactory >> + .getJobQueueServiceFromFactory(jobFact); >> + } >> + >> + @Override >> + public Scheduler createScheduler() { >> + return new QueueMuxScheduler(this.backend, this.qManager, >> this.jobQueue); >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Standa >>rdBackendManager.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/StandardBackendManager.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Standa >>rdBackendManager.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/Standa >>rdBackendManager.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,120 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.oodt.cas.resource.mux; >> + >> +import java.util.HashMap; >> +import java.util.LinkedList; >> +import java.util.List; >> +import java.util.Map; >> + >> +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; >> +import org.apache.oodt.cas.resource.monitor.Monitor; >> +import org.apache.oodt.cas.resource.scheduler.Scheduler; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; >> + >> +/** >> + * This manager keeps track of the mux-able backends for the resource >> manager. >> + * It effectively maps a queue to the backend that this queue feeds. >> + * >> + * It uses a private BackendSet to keep track of everything. >> + * >> + * For reference, a backend is a set of the following: >> + * 1. Batch manger, responsible for running jobs >> + * 2. Scheduler, responsible for scheduling a job to run >> + * 3. Monitor, responsible for managing nodes >> + * >> + * @author starchmd >> + */ >> +public class StandardBackendManager implements BackendManager { >> + Map<String,BackendSet> queueToBackend = new >> HashMap<String,BackendSet>(); >> + >> + /** >> + * Add in a backend set to this manager. >> + * @param queue - queue that maps to the given monitor, batchmgr, >>and >> scheduler >> + * @param monitor - monitor used for this set >> + * @param batchmgr - batch manager for this set >> + * @param scheduler - scheduler for this set >> + */ >> + public void addSet(String queue,Monitor monitor, Batchmgr batchmgr, >> Scheduler scheduler) { >> + queueToBackend.put(queue, new >> BackendSet(monitor,batchmgr,scheduler)); >> + } >> + /** >> + * Return monitor for the given queue. >> + * @param queue - queue to check >> + * @return montior >> + * @throws QueueManagerException when queue does not exist >> + */ >> + public Monitor getMonitor(String queue) throws >>QueueManagerException { >> + BackendSet set = queueToBackend.get(queue); >> + if (set == null) >> + throw new QueueManagerException("Queue '" + queue + "' does >> not exist"); >> + return set.monitor; >> + } >> + /** >> + * Return batch manager for the given queue. >> + * @param queue - queue to check >> + * @return batchmgr >> + * @throws QueueManagerException when queue does not exist >> + */ >> + public Batchmgr getBatchmgr(String queue) throws >> QueueManagerException { >> + BackendSet set = queueToBackend.get(queue); >> + if (set == null) >> + throw new QueueManagerException("Queue '" + queue + "' does >> not exist"); >> + return set.batchmgr; >> + } >> + /** >> + * Return scheduler for the given queue. >> + * @param queue - queue to check >> + * @return scheduler >> + * @throws QueueManagerException when queue does not exist >> + */ >> + public Scheduler getScheduler(String queue) throws >> QueueManagerException { >> + BackendSet set = queueToBackend.get(queue); >> + if (set == null) >> + throw new QueueManagerException("Queue '" + queue + "' does >> not exist"); >> + return set.scheduler; >> + } >> + /** >> + * Return a list of all monitors. >> + * @return list of all monitors >> + */ >> + public List<Monitor> getMonitors() { >> + List<Monitor> monitors = new LinkedList<Monitor>(); >> + for (BackendSet set : queueToBackend.values()) { >> + monitors.add(set.monitor); >> + } >> + return monitors; >> + } >> + /** >> + * Class that holds a set of the three backend pieces. >> + * Private class, because no accessor/modifiers have been >> + * created(public members). Acts like a struct. >> + * >> + * @author starchmd >> + */ >> + private class BackendSet { >> + public Monitor monitor = null; >> + public Batchmgr batchmgr = null; >> + public Scheduler scheduler = null; >> + >> + public BackendSet(Monitor monitor, Batchmgr batchmgr, Scheduler >> scheduler) { >> + this.monitor = monitor; >> + this.batchmgr = batchmgr; >> + this.scheduler = scheduler; >> + } >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBac >>kendRepository.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/XmlBackendRepository.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBac >>kendRepository.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBac >>kendRepository.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,186 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.oodt.cas.resource.mux; >> + >> +import java.io.File; >> +import java.io.FileInputStream; >> +import java.io.FileNotFoundException; >> +import java.net.URI; >> +import java.net.URISyntaxException; >> +import java.util.logging.Level; >> +import java.util.logging.Logger; >> + >> +import org.apache.oodt.cas.resource.scheduler.Scheduler; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.RepositoryException; >> +import >> org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; >> +import org.apache.oodt.commons.xml.XMLUtils; >> +import org.w3c.dom.Document; >> +import org.w3c.dom.Element; >> +import org.w3c.dom.NodeList; >> + >> +/** >> + * Class to load BackendManager from XML file. >> + * @author starchmd >> + */ >> +public class XmlBackendRepository implements BackendRepository { >> + >> + private static final Logger LOG = >> Logger.getLogger(XmlBackendRepository.class.getName()); >> + private String uri; >> + >> + //Constants >> + private static final String SCHEDULER = "scheduler"; >> + private static final String BATCHMGR = "batchmgr"; >> + private static final String MONITOR = "monitor"; >> + >> + private static final String MONITOR_PROPERTY = >> "resource.monitor.factory"; >> + private static final String BATCHMGR_PROPERTY = >> "resource.batchmgr.factory"; >> + >> + /** >> + * Ctor >> + * @param uri - uri of XML file containing mapping >> + */ >> + public XmlBackendRepository(String uri) { >> + if (uri == null) >> + throw new NullPointerException("URI for queue-to-backend >>xml >> file cannot be null"); >> + this.uri = uri; >> + } >> + /* (non-Javadoc) >> + * @see org.apache.oodt.cas.resource.mux.BackendRepository#load() >> + */ >> + @Override >> + public BackendManager load() throws RepositoryException { >> + LOG.log(Level.INFO,"Reading backend set manager from: >>"+this.uri); >> + BackendManager bm = new StandardBackendManager(); >> + String origMon = System.getProperty(MONITOR_PROPERTY); >> + String origBat = System.getProperty(BATCHMGR_PROPERTY); >> + try { >> + File file = new File(new URI(this.uri)); >> + Document root = XMLUtils.getDocumentRoot(new >> FileInputStream(file)); >> + NodeList list = root.getElementsByTagName("queue"); >> + if (list != null && list.getLength() > 0) { >> + for (int k = 0; k < list.getLength(); k++) { >> + Element node = (Element)list.item(k); >> + String queue = node.getAttribute("name"); >> + //Set properties for batch and monitor factories >> + //So scheduler builds as repository specifies >> + try { >> + String mfact = getMonitor(queue,node); >> + LOG.log(Level.INFO,"Setting monitor factory >> property to: "+mfact); >> + System.setProperty(MONITOR_PROPERTY, mfact); >> + } catch (RepositoryException e) { >> + LOG.log(Level.INFO, "No monitor factory for >>queue >> "+queue+", using system property."); >> + } >> + try { >> + String bfact = getBatchmgr(queue,node); >> + LOG.log(Level.INFO,"Setting batchmgr factory >> property to: "+bfact); >> + System.setProperty(BATCHMGR_PROPERTY, bfact); >> + } catch (RepositoryException e) { >> + LOG.log(Level.INFO, "No batchmgr factory for >> queue "+queue+", using system property."); >> + } >> + //Build scheduler >> + Scheduler sch = getScheduler(queue,node); >> + bm.addSet(queue, sch.getMonitor(), >>sch.getBatchmgr(), >> sch); >> + //Reset Properties for next item >> + resetAlteredProperty(MONITOR_PROPERTY,origMon); >> + resetAlteredProperty(BATCHMGR_PROPERTY,origBat); >> + } >> + } >> + } catch (URISyntaxException e) { >> + LOG.log(Level.SEVERE,"Malformed URI: "+this.uri); >> + throw new RepositoryException(e); >> + } catch(FileNotFoundException e) { >> + LOG.log(Level.SEVERE,"File not found: "+this.uri+" from >> working dir: "+new File(".").getAbsolutePath()); >> + throw new RepositoryException(e); >> + } catch (ClassCastException e) { >> + LOG.log(Level.SEVERE,"Queue tag must represent XML >>element."); >> + throw new RepositoryException(e); >> + } finally { >> + resetAlteredProperty(MONITOR_PROPERTY,origMon); >> + resetAlteredProperty(BATCHMGR_PROPERTY,origBat); >> + } >> + >> + return bm; >> + } >> + /** >> + * Resets a property. Allows nulls >> + * @param prop - property name to reset >> + * @param value - value to reset to, can be null >> + */ >> + private static void resetAlteredProperty(String prop,String value) >>{ >> + if (value == null) { >> + System.clearProperty(prop); >> + return; >> + } >> + System.setProperty(prop,value); >> + } >> + >> + /** >> + * Get monitor factory from XML >> + * @param queue - current queue, for error reporting >> + * @param node - node that is being read >> + * @return monitor factory string >> + * @throws RepositoryException >> + */ >> + private static String getMonitor(String queue,Element node) throws >> RepositoryException { >> + return getFactoryAttribute(queue, node, MONITOR); >> + } >> + /** >> + * Get scheduler from XML >> + * @param queue - current queue, for error reporting >> + * @param node - node that is being read >> + * @return newly constructed Scheduler >> + * @throws RepositoryException >> + */ >> + private static Scheduler getScheduler(String queue,Element node) >> throws RepositoryException { >> + String factory = getFactoryAttribute(queue, node, SCHEDULER); >> + LOG.log(Level.INFO,"Loading monitor from: "+factory); >> + Scheduler sch = >> >>GenericResourceManagerObjectFactory.getSchedulerServiceFromFactory(factor >>y); >> + if (sch != null) >> + return sch; >> + throw new RepositoryException("Could instantiate from: >>"+factory); >> + } >> + /** >> + * Get batchmgr factory from XML >> + * @param queue - current queue, for error reporting >> + * @param node - node that is being read >> + * @return batch manager factory name >> + * @throws RepositoryException >> + */ >> + private static String getBatchmgr(String queue,Element node) throws >> RepositoryException { >> + return getFactoryAttribute(queue, node, BATCHMGR); >> + } >> + /** >> + * Pull out the factory attribute from tag with given name. >> + * @param queue - current queue, for error reporting >> + * @param elem - element that contains tags as children >> + * @param tag - string name of tag looked for. i.e. "monitor" >> + * @return name of factory class >> + * @throws RepositoryException - thrown if more than one child >> matches, no children match, or other error >> + */ >> + private static String getFactoryAttribute(String queue,Element >>elem, >> String tag) throws RepositoryException { >> + NodeList children = elem.getElementsByTagName(tag); >> + try { >> + String attr = ""; >> + if (children.getLength() != 1 || (attr = >> ((Element)children.item(0)).getAttribute("factory")) == "") { >> + throw new RepositoryException("Could not find exactly >>one >> "+tag+", with factory set, in queue: "+queue); >> + } >> + return attr; >> + } catch (ClassCastException e) { >> + throw new RepositoryException("Tag "+tag+" does not >>represent >> XML element in queue: "+queue,e); >> + } >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBac >>kendRepositoryFactory.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/mux/XmlBackendRepositoryFactory.java?rev=1634142&view= >>auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBac >>kendRepositoryFactory.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/mux/XmlBac >>kendRepositoryFactory.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,55 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +package org.apache.oodt.cas.resource.mux; >> + >> +//OODT imports >> +import org.apache.oodt.cas.metadata.util.PathUtils; >> + >> +//JDK imports >> +import java.util.logging.Level; >> +import java.util.logging.Logger; >> + >> +/** >> + * >> + * @author starchmd >> + * @version $Revision$ >> + * >> + * <p> >> + * The XML Backend Repository Factory interface. >> + * </p> >> + */ >> +public class XmlBackendRepositoryFactory implements >> BackendRepositoryFactory { >> + >> + private static final Logger LOG = >> Logger.getLogger(XmlBackendRepositoryFactory.class.getName()); >> + /** >> + * Create the backend repository (xml) >> + * @return the newly minted backend repository >> + */ >> + public XmlBackendRepository createBackendRepository() { >> + try { >> + String uri = >> System.getProperty("resource.backend.mux.xmlrepository.queuetobackend"); >> + /* do env var replacement */ >> + uri = PathUtils.replaceEnvVariables(uri); >> + return new XmlBackendRepository(uri); >> + } catch (NullPointerException e) { >> + LOG.log( Level.SEVERE,"Failed to create >> XmlBackendRepository: "+ e.getMessage(), e); >> + return null; >> + } >> + } >> + >> +} >> >> Added: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/ex >>ceptions/RepositoryException.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/structs/exceptions/RepositoryException.java?rev=163414 >>2&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/ex >>ceptions/RepositoryException.java >> (added) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/structs/ex >>ceptions/RepositoryException.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,60 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +package org.apache.oodt.cas.resource.structs.exceptions; >> + >> +/** >> + * @author starchmd >> + * @version $Revision$ >> + * >> + * <p> >> + * An exception thrown by the {@link BackendRepository} when an error >> occurs. >> + * </p> >> + */ >> +public class RepositoryException extends Exception { >> + >> + /* serial version UID */ >> + private static final long serialVersionUID = 4568261126290589269L; >> + >> + /** >> + * >> + */ >> + public RepositoryException() {} >> + >> + /** >> + * @param message >> + */ >> + public RepositoryException(String message) { >> + super(message); >> + } >> + >> + /** >> + * @param cause >> + */ >> + public RepositoryException(Throwable cause) { >> + super(cause); >> + } >> + >> + /** >> + * @param message >> + * @param cause >> + */ >> + public RepositoryException(String message, Throwable cause) { >> + super(message, cause); >> + } >> + >> +} >> >> Modified: >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/Gener >>icResourceManagerObjectFactory.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/java/org/apache >>/oodt/cas/resource/util/GenericResourceManagerObjectFactory.java?rev=1634 >>142&r1=1634141&r2=1634142&view=diff >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/Gener >>icResourceManagerObjectFactory.java >> (original) >> +++ >> >>oodt/trunk/resource/src/main/java/org/apache/oodt/cas/resource/util/Gener >>icResourceManagerObjectFactory.java >> Fri Oct 24 21:38:01 2014 >> @@ -33,6 +33,8 @@ import org.apache.oodt.cas.resource.moni >> import org.apache.oodt.cas.resource.monitor.MonitorFactory; >> import >> org.apache.oodt.cas.resource.monitor.ganglia.loadcalc.LoadCalculator; >> import >> >>org.apache.oodt.cas.resource.monitor.ganglia.loadcalc.LoadCalculatorFacto >>ry; >> +import org.apache.oodt.cas.resource.mux.BackendRepository; >> +import org.apache.oodt.cas.resource.mux.BackendRepositoryFactory; >> import org.apache.oodt.cas.resource.noderepo.NodeRepository; >> import org.apache.oodt.cas.resource.noderepo.NodeRepositoryFactory; >> import org.apache.oodt.cas.resource.queuerepo.QueueRepository; >> @@ -162,7 +164,42 @@ public final class GenericResourceManage >> >> return null; >> } >> + /** >> + * Creates a new {@link BackendRepository} implementation from the >>given >> + * {@link BackendRepositoryFactory} class name. >> + * >> + * @param backendRepositoryFactory >> + * The class name of the {@link BackendRepositoryFactory} to >> use to create new >> + * {@link BackendRepository}s. >> + * @return A new implementation of a {@link BackendRepository}. >> + */ >> + public static BackendRepository >>getBackendRepositoryFromFactory(String >> backendRepositoryFactory) { >> + Class clazz = null; >> + BackendRepositoryFactory factory = null; >> + >> + try { >> + clazz = Class.forName(backendRepositoryFactory); >> + factory = (BackendRepositoryFactory) clazz.newInstance(); >> + return factory.createBackendRepository(); >> + } catch (ClassNotFoundException e) { >> + e.printStackTrace(); >> + LOG.log(Level.WARNING, >> + "ClassNotFoundException when loading backend repository >>factory >> class " >> + + backendRepositoryFactory + " Message: " + >>e.getMessage()); >> + } catch (InstantiationException e) { >> + e.printStackTrace(); >> + LOG.log(Level.WARNING, >> + "InstantiationException when loading backend repository >>factory >> class " >> + + backendRepositoryFactory + " Message: " + >>e.getMessage()); >> + } catch (IllegalAccessException e) { >> + e.printStackTrace(); >> + LOG.log(Level.WARNING, >> + "IllegalAccessException when loading backend repository >>factory >> class " >> + + backendRepositoryFactory + " Message: " + >>e.getMessage()); >> + } >> >> + return null; >> + } >> /** >> * Creates a new {@link NodeRepository} implementation from the given >> * {@link QueueRepositoryFactory} class name. >> >> Added: >> >>oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping. >>xml >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/examp >>les/queue-to-backend-mapping.xml?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping. >>xml >> (added) >> +++ >> >>oodt/trunk/resource/src/main/resources/examples/queue-to-backend-mapping. >>xml >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,24 @@ >> +<?xml version='1.0' encoding='UTF-8'?> >> +<!-- >> +Licensed to the Apache Software Foundation (ASF) under one or more >> contributor >> +license agreements. See the NOTICE.txt file distributed with this work >> for >> +additional information regarding copyright ownership. The ASF licenses >> this >> +file to you under the Apache License, Version 2.0 (the "License"); you >> may not >> +use this file except in compliance with the License. You may obtain a >> copy of >> +the License at >> + >> + http://www.apache.org/licenses/LICENSE-2.0 >> + >> +Unless required by applicable law or agreed to in writing, software >> +distributed under the License is distributed on an "AS IS" BASIS, >>WITHOUT >> +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See >>the >> +License for the specific language governing permissions and limitations >> under >> +the License. >> +--> >> +<cas:queue-to-backend-mapping >>xmlns:cas="http://oodt.jpl.nasa.gov/1.0/cas >> "> >> + <queue name="example"> >> + <scheduler >> factory="org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory"/> >> + <monitor >> >>factory="org.apache.oodt.cas.resource.monitor.AssignmentMonitorFactory"/> >> + <batchmgr >> factory="org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchMgrFactory"/> >> + </queue> >> +</cas:queue-to-backend-mapping> >> >> Modified: oodt/trunk/resource/src/main/resources/resource.properties >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/main/resources/resou >>rce.properties?rev=1634142&r1=1634141&r2=1634142&view=diff >> >> >>========================================================================= >>===== >> --- oodt/trunk/resource/src/main/resources/resource.properties >>(original) >> +++ oodt/trunk/resource/src/main/resources/resource.properties Fri Oct >>24 >> 21:38:01 2014 >> @@ -31,6 +31,11 @@ resource.jobqueue.factory = org.apache.o >> # resource job repository factory >> resource.jobrepo.factory = >> org.apache.oodt.cas.resource.jobrepo.MemoryJobRepositoryFactory >> >> +# For queue-multiplexing scheduler >> +resource.backend.mux.repository = >> org.apache.oodt.cas.resource.mux.XmlBackendRepositoryFactory >> +resource.backend.mux.xmlrepository.queuetobackend = >> file://[HOME]/queue-to-backend.xml >> + >> + >> # node repository factory >> org.apache.oodt.cas.resource.nodes.repo.factory = >> org.apache.oodt.cas.resource.noderepo.XmlNodeRepositoryFactory >> >> >> Added: >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xBatchmgr.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt >>/cas/resource/mux/TestQueueMuxBatchmgr.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xBatchmgr.java >> (added) >> +++ >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xBatchmgr.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,129 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> + >> +package org.apache.oodt.cas.resource.mux; >> + >> +//OODT imports >> +import org.apache.oodt.cas.resource.mux.mocks.MockBatchManager; >> +import org.apache.oodt.cas.resource.structs.Job; >> +import org.apache.oodt.cas.resource.structs.JobSpec; >> +import org.apache.oodt.cas.resource.structs.ResourceNode; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; >> + >> +//JUnit imports >> +import junit.framework.TestCase; >> + >> +/** >> + * @author starchmd >> + * @version $Revision$ >> + * >> + * <p> >> + * Test Suite for the {@link QueueBatchMonitor} service >> + * </p>. >> + */ >> +public class TestQueueMuxBatchmgr extends TestCase { >> + >> + private QueueMuxBatchManager queue; >> + private MockBatchManager mock1; >> + private MockBatchManager mock2; >> + >> + protected void setUp() { >> + BackendManager back = new StandardBackendManager(); >> + back.addSet("queue-1", null,(mock1 = new MockBatchManager()), >> null); >> + back.addSet("queue-2", null,(mock2 = new MockBatchManager()), >> null); >> + queue = new QueueMuxBatchManager(back); >> + } >> + >> + public void testExecuteRemotely() { >> + try { >> + >> + //Test that the jobs are put in seperate mock-backends >>based >> on queues >> + ResourceNode node1 = new ResourceNode(); >> + ResourceNode node2 = new ResourceNode(); >> + >> + JobSpec spec1 = this.getSpecFromQueue("queue-1"); >> + queue.executeRemotely(spec1, node1); >> + >> + JobSpec spec2 = this.getSpecFromQueue("queue-2"); >> + queue.executeRemotely(spec2, node2); >> + //Yes...use reference equality, as these must be the exact >> same object >> + TestCase.assertEquals(spec1,mock1.getCurrentJobSpec()); >> + TestCase.assertEquals(spec2,mock2.getCurrentJobSpec()); >> + >>TestCase.assertEquals(node1,mock1.getCurrentResourceNode()); >> + >>TestCase.assertEquals(node2,mock2.getCurrentResourceNode()); >> + //Throws exception on bad queue >> + try { >> + >> queue.executeRemotely(this.getSpecFromQueue("queue-3"),node1); >> + TestCase.fail("Failed to throw JobExecutionException on >> unknown queue."); >> + } catch(JobExecutionException e) {} >> + } catch (JobExecutionException e) { >> + TestCase.fail("Unexpected Exception: "+e.getMessage()); >> + } >> + } >> + >> + public void testKillJob() { >> + try { >> + ResourceNode node1 = new ResourceNode(); >> + ResourceNode node2 = new ResourceNode(); >> + >> + JobSpec spec1 = this.getSpecFromQueue("queue-1"); >> + queue.executeRemotely(spec1, node1); >> + >> + JobSpec spec2 = this.getSpecFromQueue("queue-2"); >> + queue.executeRemotely(spec2, node2); >> + //Make sure that one can kill a job, and the other job is >> running >> + TestCase.assertTrue(queue.killJob(spec1.getJob().getId(), >> node1)); >> + TestCase.assertEquals(mock1.getCurrentJobSpec(),null); >> + TestCase.assertEquals(mock2.getCurrentJobSpec(),spec2); >> + //Make sure kill fails with bad queue >> + >> >>TestCase.assertFalse(queue.killJob(this.getSpecFromQueue("queue-3").getJo >>b().getId(), >> node1)); >> + } catch (JobExecutionException e) { >> + TestCase.fail("Unexpected Exception: "+e.getMessage()); >> + } >> + } >> + >> + public void testGetExecNode() { >> + try { >> + ResourceNode node1 = new ResourceNode(); >> + ResourceNode node2 = new ResourceNode(); >> + node1.setId("Node1-ID"); >> + node2.setId("Node2-ID"); >> + JobSpec spec1 = this.getSpecFromQueue("queue-1"); >> + queue.executeRemotely(spec1, node1); >> + >> + JobSpec spec2 = this.getSpecFromQueue("queue-2"); >> + queue.executeRemotely(spec2, node2); >> + //Make that the execution node is same >> + >> >>TestCase.assertEquals(node1.getNodeId(),queue.getExecutionNode(spec1.getJ >>ob().getId())); >> + >> >>TestCase.assertEquals(node2.getNodeId(),queue.getExecutionNode(spec2.getJ >>ob().getId())); >> + //Returns null, if bad-queue >> + >> >>TestCase.assertNull(queue.getExecutionNode(this.getSpecFromQueue("queue-3 >>").getJob().getId())); >> + } catch (JobExecutionException e) { >> + TestCase.fail("Unexpected Exception: "+e.getMessage()); >> + } >> + } >> + >> + private JobSpec getSpecFromQueue(String queue) { >> + JobSpec spec1 = new JobSpec(); >> + Job job1 = new Job(); >> + job1.setId("000000100000011-"+queue); >> + job1.setQueueName(queue); >> + spec1.setJob(job1); >> + return spec1; >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xMonitor.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt >>/cas/resource/mux/TestQueueMuxMonitor.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xMonitor.java >> (added) >> +++ >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/TestQueueMu >>xMonitor.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,204 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +package org.apache.oodt.cas.resource.mux; >> + >> +//OODT imports >> +import java.net.MalformedURLException; >> +import java.net.URL; >> +import java.util.HashMap; >> +import java.util.LinkedList; >> +import java.util.List; >> +import java.util.Map; >> + >> +import org.apache.oodt.cas.resource.mux.mocks.MockMonitor; >> +import org.apache.oodt.cas.resource.scheduler.QueueManager; >> +import org.apache.oodt.cas.resource.structs.ResourceNode; >> +import >>org.apache.oodt.cas.resource.structs.exceptions.MonitorException; >> + >> +import >> org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; >> + >> +//JUnit imports >> +import junit.framework.TestCase; >> + >> +/** >> + * @author starchmd >> + * @version $Revision$ >> + * >> + * <p> >> + * Test Suite for the {@link QueueBatchMonitor} service >> + * </p>. >> + */ >> +public class TestQueueMuxMonitor extends TestCase { >> + >> + private QueueMuxMonitor monitor; >> + private MockMonitor mock1; >> + private MockMonitor mock2; >> + private ResourceNode superfluous; >> + private QueueManager qm; >> + Map<MockMonitor,List<ResourceNode>> map; >> + >> + protected void setUp() { >> + try { >> + //Map monitor to nodes list >> + map = new HashMap<MockMonitor,List<ResourceNode>>(); >> + List<ResourceNode> nodes1 = getNodesList("mock-1"); >> + List<ResourceNode> nodes2 = getNodesList("mock-2"); >> + //Backend Manager setup >> + BackendManager back = new StandardBackendManager(); >> + back.addSet("queue-1",(mock1 = addMonitor(0,map,nodes1)), >> null, null); >> + back.addSet("queue-2",(mock2 = addMonitor(5,map,nodes2)), >> null, null); >> + //Make sure the queue manager is setup >> + qm = new QueueManager(); >> + qm.addQueue("queue-1"); >> + qm.addQueue("queue-2"); >> + qm.addQueue("queue-3"); >> + for (ResourceNode rn : nodes1) >> + qm.addNodeToQueue(rn.getNodeId(), "queue-1"); >> + for (ResourceNode rn : nodes2) >> + qm.addNodeToQueue(rn.getNodeId(), "queue-2"); >> + //Add an extra node to test "unknown queue" >> + qm.addNodeToQueue((superfluous = new >> ResourceNode("superfluous-1",new >>URL("http://superfluous-1"),-2)).getNodeId(), >> "queue-3"); >> + monitor = new QueueMuxMonitor(back, qm); >> + } catch (QueueManagerException e) { >> + TestCase.fail("Unanticipated queue manager exception >>caught: >> "+e.getMessage()); >> + } catch (MalformedURLException e) { >> + TestCase.fail("Unanticipated URL exception caught: >> "+e.getMessage()); >> + } >> + } >> + >> + public void testGetLoad() { >> + try { >> + >> >>TestCase.assertEquals(mock1.load,monitor.getLoad(map.get(mock1).get(0))); >> + >> >>TestCase.assertEquals(mock2.load,monitor.getLoad(map.get(mock2).get(0))); >> + >> + /*try { >> + monitor.getLoad(superfluous); >> + TestCase.fail("Exception not thrown for unknown >>queue."); >> + } catch (MonitorException e) { >> + }*/ >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } >> + } >> + >> + public void testGetNodes() { >> + try { >> + List<ResourceNode> nodes = monitor.getNodes(); >> + for (ResourceNode rn :map.get(mock1)) >> + TestCase.assertTrue("Node: "+rn.getNodeId()+ " not >> found.", nodes.contains(rn)); >> + for (ResourceNode rn :map.get(mock2)) >> + TestCase.assertTrue("Node: "+rn.getNodeId()+ " not >> found.", nodes.contains(rn)); >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } >> + } >> + >> + public void testGetNodeById() { >> + try { >> + >> >>TestCase.assertEquals(map.get(mock1).get(0),monitor.getNodeById("mock-1-1 >>")); >> + >> >>TestCase.assertEquals(map.get(mock2).get(0),monitor.getNodeById("mock-2-1 >>")); >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } >> + } >> + public void testGetNodeByURL() { >> + try { >> + >> TestCase.assertEquals(map.get(mock1).get(1),monitor.getNodeByURL(new >>URL(" >> http://mock-1-2"))); >> + >> TestCase.assertEquals(map.get(mock2).get(1),monitor.getNodeByURL(new >>URL(" >> http://mock-2-2"))); >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } catch (MalformedURLException e1) { >> + TestCase.fail("Unanticipated URL exception caught: >> "+e1.getMessage()); >> + } >> + } >> + >> + public void testReduceLoad() { >> + try { >> + >>TestCase.assertTrue(monitor.reduceLoad(map.get(mock1).get(2), >> 5)); >> + >>TestCase.assertTrue(monitor.reduceLoad(map.get(mock2).get(2), >> 3)); >> + >>TestCase.assertEquals(map.get(mock1).get(2).getCapacity(),25); >> + >>TestCase.assertEquals(map.get(mock2).get(2).getCapacity(),27); >> + try { >> + monitor.reduceLoad(superfluous, 2); >> + TestCase.fail("Exception not thrown for unknown >>queue."); >> + } catch (MonitorException e) {} >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } >> + } >> + >> + public void testAssignLoad() { >> + try { >> + >>TestCase.assertTrue(monitor.assignLoad(map.get(mock1).get(2), >> 5)); >> + >>TestCase.assertTrue(monitor.assignLoad(map.get(mock2).get(2), >> 3)); >> + >>TestCase.assertEquals(map.get(mock1).get(2).getCapacity(),5); >> + >>TestCase.assertEquals(map.get(mock2).get(2).getCapacity(),3); >> + try { >> + monitor.assignLoad(superfluous, 2); >> + TestCase.fail("Exception not thrown for unknown >>queue."); >> + } catch (MonitorException e) {} >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } >> + } >> + >> + public void testAddNode() { >> + try { >> + ResourceNode node = new ResourceNode("a-new-node",null,2); >> + qm.addNodeToQueue(node.getNodeId(), "queue-1"); >> + monitor.addNode(node); >> + TestCase.assertEquals(node,mock1.getAdded()); >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } catch (QueueManagerException e1) { >> + TestCase.fail("Unanticipated queue manager exception >>caught: >> "+e1.getMessage()); >> + } >> + } >> + public void removeNodeById() { >> + try { >> + ResourceNode node = new ResourceNode("a-new-node",null,2); >> + qm.addNodeToQueue(node.getNodeId(), "queue-1"); >> + monitor.addNode(node); >> + TestCase.assertEquals(node,mock1.getAdded()); >> + monitor.removeNodeById(node.getNodeId()); >> + TestCase.assertEquals(null,mock1.getAdded()); >> + } catch(MonitorException e) { >> + TestCase.fail("Unanticipated monitor exception caught: >> "+e.getMessage()); >> + } catch (QueueManagerException e1) { >> + TestCase.fail("Unanticipated queue manager exception >>caught: >> "+e1.getMessage()); >> + } >> + } >> + >> + private MockMonitor addMonitor(int load,Map<MockMonitor, >> List<ResourceNode>> map, List<ResourceNode> list) { >> + MockMonitor mon = new MockMonitor(load, list, list.get(0), >> list.get(1), list.get(2)); >> + map.put(mon, list); >> + return mon; >> + } >> + private List<ResourceNode> getNodesList(String prefix) { >> + List<ResourceNode> nodes = new LinkedList<ResourceNode>(); >> + try { >> + nodes.add(new ResourceNode(prefix+"-1",new URL("http:// >> "+prefix+"-1"),10)); >> + nodes.add(new ResourceNode(prefix+"-2",new URL("http:// >> "+prefix+"-2"),20)); >> + nodes.add(new ResourceNode(prefix+"-3",new URL("http:// >> "+prefix+"-3"),30)); >> + nodes.add(new ResourceNode(prefix+"-4",new URL("http:// >> "+prefix+"-4"),40)); >> + } catch (MalformedURLException e) { >> + TestCase.fail("Unanticipated URL exception caught: >> "+e.getMessage()); >> + } >> + return nodes; >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockB >>atchManager.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt >>/cas/resource/mux/mocks/MockBatchManager.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockB >>atchManager.java >> (added) >> +++ >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockB >>atchManager.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,82 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.oodt.cas.resource.mux.mocks; >> + >> +import org.apache.oodt.cas.resource.batchmgr.Batchmgr; >> +import org.apache.oodt.cas.resource.jobrepo.JobRepository; >> +import org.apache.oodt.cas.resource.monitor.Monitor; >> +import org.apache.oodt.cas.resource.structs.JobSpec; >> +import org.apache.oodt.cas.resource.structs.ResourceNode; >> +import >> org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; >> +/** >> + * This is a mock version of the batch manager. It SHOULD NOT, and >> + * CAN NOT be used as a normal class. >> + * >> + * @author starchmd >> + */ >> +public class MockBatchManager implements Batchmgr { >> + >> + private JobSpec execJobSpec; >> + private ResourceNode execResNode; >> + >> + @Override >> + public boolean executeRemotely(JobSpec job, ResourceNode resNode) >> + throws JobExecutionException { >> + this.execJobSpec = job; >> + this.execResNode = resNode; >> + return true; >> + } >> + >> + @Override >> + public void setMonitor(Monitor monitor) {} >> + >> + @Override >> + public void setJobRepository(JobRepository repository) {} >> + >> + @Override >> + public boolean killJob(String jobId, ResourceNode node) { >> + if (this.execJobSpec.getJob().getId().equals(jobId)) >> + { >> + this.execJobSpec = null; >> + this.execResNode = null; >> + return true; >> + } >> + return false; >> + } >> + >> + @Override >> + public String getExecutionNode(String jobId) { >> + return execResNode.getNodeId(); >> + } >> + /***** >> + * The following are test methods to report what jobs are here. >> + *****/ >> + /** >> + * Return the current jobspec, for testing purposes >> + * @return >> + */ >> + public JobSpec getCurrentJobSpec() { >> + return this.execJobSpec; >> + } >> + /** >> + * Return the current resource node, for testing purposes >> + * @return >> + */ >> + public ResourceNode getCurrentResourceNode() { >> + return execResNode; >> + } >> +} >> >> Added: >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockM >>onitor.java >> URL: >> >>http://svn.apache.org/viewvc/oodt/trunk/resource/src/test/org/apache/oodt >>/cas/resource/mux/mocks/MockMonitor.java?rev=1634142&view=auto >> >> >>========================================================================= >>===== >> --- >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockM >>onitor.java >> (added) >> +++ >> >>oodt/trunk/resource/src/test/org/apache/oodt/cas/resource/mux/mocks/MockM >>onitor.java >> Fri Oct 24 21:38:01 2014 >> @@ -0,0 +1,91 @@ >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed >>with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >>2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.oodt.cas.resource.mux.mocks; >> + >> +import java.net.URL; >> +import java.util.List; >> + >> +import org.apache.oodt.cas.resource.monitor.Monitor; >> +import org.apache.oodt.cas.resource.structs.ResourceNode; >> +import >>org.apache.oodt.cas.resource.structs.exceptions.MonitorException; >> + >> +public class MockMonitor implements Monitor { >> + >> + public int load = -1; >> + List<ResourceNode> nodes; >> + ResourceNode id; >> + ResourceNode url; >> + ResourceNode add; >> + ResourceNode reduce; >> + >> + public MockMonitor(int load,List<ResourceNode> nodes, ResourceNode >> id, ResourceNode url, ResourceNode reduce) { >> + this.load = load; >> + this.nodes = nodes; >> + this.id = id; >> + this.url = url; >> + this.reduce = reduce; >> + } >> + >> + @Override >> + public int getLoad(ResourceNode node) throws MonitorException { >> + return load; >> + } >> + @Override >> + public List getNodes() throws MonitorException { >> + return nodes; >> + } >> + >> + @Override >> + public ResourceNode getNodeById(String nodeId) throws >> MonitorException { >> + return id.getNodeId().equals(nodeId)?id:null; >> + } >> + >> + @Override >> + public ResourceNode getNodeByURL(URL ipAddr) throws >>MonitorException { >> + return url.getIpAddr().equals(ipAddr)?url:null; >> + } >> + >> + @Override >> + public boolean reduceLoad(ResourceNode node, int loadValue) >> + throws MonitorException { >> + reduce.setCapacity(reduce.getCapacity() - loadValue); >> + return true; >> + } >> + >> + @Override >> + public boolean assignLoad(ResourceNode node, int loadValue) >> + throws MonitorException { >> + reduce.setCapacity(loadValue); >> + return true; >> + } >> + >> + @Override >> + public void addNode(ResourceNode node) throws MonitorException { >> + this.add = node; >> + >> + } >> + >> + @Override >> + public void removeNodeById(String nodeId) throws MonitorException { >> + if (this.add.getNodeId().equals(nodeId)) >> + this.add = null; >> + } >> + >> + public ResourceNode getAdded() { >> + return this.add; >> + } >> +} >> >> >> >> >> ... >> >> [Message clipped] > > > > >-- >*Lewis*
