Repository: oodt Updated Branches: refs/heads/master ba6ca1b4d -> 9feacbb01
Stabilized avro rpc resource manager client and server by: 1. Adding 3 more RPC methods to be backward compatible with existing RPC client. 2. Added tests for both client and server by adapting XML version's tests. 3. And they are passing Project: http://git-wip-us.apache.org/repos/asf/oodt/repo Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/1b921b5c Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/1b921b5c Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/1b921b5c Branch: refs/heads/master Commit: 1b921b5c64783d7e929cd435057fbdb2188e47c9 Parents: ba6ca1b Author: Imesha Sudasingha <[email protected]> Authored: Fri Mar 2 23:18:14 2018 +0530 Committer: Imesha Sudasingha <[email protected]> Committed: Fri Mar 2 23:18:14 2018 +0530 ---------------------------------------------------------------------- .../org/apache/oodt/commons/AvroExecServer.java | 11 - .../avro/types/resource_manager_protocol.avdl | 6 + resource/src/main/bin/resmgr | 3 +- .../oodt/cas/resource/scheduler/Scheduler.java | 93 ++++--- .../resource/system/AvroRpcResourceManager.java | 241 +++++++++++++------ .../system/AvroRpcResourceManagerClient.java | 40 ++- .../cas/resource/system/ResourceManager.java | 12 +- .../resource/system/ResourceManagerClient.java | 11 +- .../resource/system/ResourceManagerMain.java | 66 +++++ .../resource/system/XmlRpcResourceManager.java | 86 +++---- .../system/XmlRpcResourceManagerClient.java | 46 ++-- .../system/TestAvroRpcResourceManager.java | 64 ++--- .../TestAvroRpcResourceManagerClient.java | 211 ++++++++++++++++ .../system/TestXmlRpcResourceManager.java | 3 +- .../system/TestXmlRpcResourceManagerClient.java | 238 +++++++++--------- .../TestDistributedAvroRpcResourceManager.java | 85 +++++++ .../TestDistributedXmlRpcResourceManager.java | 1 + 17 files changed, 851 insertions(+), 366 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java b/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java index eb1c786..c779acd 100644 --- a/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java +++ b/commons/src/main/java/org/apache/oodt/commons/AvroExecServer.java @@ -17,16 +17,9 @@ package org.apache.oodt.commons; -import org.apache.avro.Protocol; -import org.apache.avro.ipc.HttpServer; -import org.apache.avro.ipc.Responder; -import org.apache.avro.ipc.Server; -import org.apache.avro.ipc.generic.GenericResponder; -import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.oodt.commons.io.Base64EncodingOutputStream; import org.apache.oodt.commons.util.LogInit; import org.apache.oodt.commons.util.XML; -import org.apache.xmlrpc.XmlRpcServer; import org.w3c.dom.DOMException; import org.w3c.dom.Document; import org.w3c.dom.DocumentType; @@ -69,10 +62,6 @@ public class AvroExecServer { /** The <log> element within the status document. */ private Element logElement; - /** The XML-RPC interface to this server. */ - private HttpServer server; - - /** Status DTD Document Type Definition formal public identifier. */ public static final String STATUS_FPI = "-//JPL//DTD EDA Server Status 1.0"; http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/avro/types/resource_manager_protocol.avdl ---------------------------------------------------------------------- diff --git a/resource/src/main/avro/types/resource_manager_protocol.avdl b/resource/src/main/avro/types/resource_manager_protocol.avdl index 8b3f43f..f5d6725 100644 --- a/resource/src/main/avro/types/resource_manager_protocol.avdl +++ b/resource/src/main/avro/types/resource_manager_protocol.avdl @@ -20,6 +20,12 @@ import schema "AvroResourceNode.avsc"; string getExecutionNode(string jobId); + string getExecReport(); + + string getNodeReport(); + + array<AvroJob> getQueuedJobs(); + string handleJob(AvroJob exec, AvroJobInput into); boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, string hostUrl); http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/bin/resmgr ---------------------------------------------------------------------- diff --git a/resource/src/main/bin/resmgr b/resource/src/main/bin/resmgr index da6ae29..f80cd90 100644 --- a/resource/src/main/bin/resmgr +++ b/resource/src/main/bin/resmgr @@ -51,7 +51,8 @@ case "$1" in $JAVA_HOME/bin/java -Djava.ext.dirs=${CAS_RESMGR_HOME}/lib \ -Djava.util.logging.config.file=${CAS_RESMGR_HOME}/etc/logging.properties \ -Dorg.apache.oodt.cas.resource.properties=${CAS_RESMGR_PROPS} \ - org.apache.oodt.cas.resource.system.XmlRpcResourceManager --portNum $SERVER_PORT & + -Dresmgr.manager=org.apache.oodt.cas.resource.system.AvroRpcResourceManager \ + org.apache.oodt.cas.resource.system.ResourceManagerMain --portNum $SERVER_PORT & echo $! >${RUN_HOME}/cas.resmgr.pid echo "OK" sleep 5 http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java b/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java index 764527e..991722b 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/scheduler/Scheduler.java @@ -19,10 +19,10 @@ package org.apache.oodt.cas.resource.scheduler; //OODT imports + import org.apache.oodt.cas.resource.batchmgr.Batchmgr; import org.apache.oodt.cas.resource.jobqueue.JobQueue; import org.apache.oodt.cas.resource.monitor.Monitor; -import org.apache.oodt.cas.resource.scheduler.QueueManager; import org.apache.oodt.cas.resource.structs.JobSpec; import org.apache.oodt.cas.resource.structs.ResourceNode; import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException; @@ -31,62 +31,57 @@ import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException; * @author woollard * @author bfoster * @version $Revision$ - * + * <p> * <p> * A scheduler interface. * </p> - * */ -public interface Scheduler extends Runnable{ +public interface Scheduler extends Runnable { - /** - * Schedules a job to be executed by a particular batch manager. - * - * @param spec - * The {@link JobSpec} to schedule for execution. - * @return Whether the job was successfully scheduled or not. - * @throws SchedulerException If there was any error scheduling - * the given {@link JobSpec}. - */ + /** + * Schedules a job to be executed by a particular batch manager. + * + * @param spec The {@link JobSpec} to schedule for execution. + * @return Whether the job was successfully scheduled or not. + * @throws SchedulerException If there was any error scheduling + * the given {@link JobSpec}. + */ boolean schedule(JobSpec spec) throws SchedulerException; - /** - * Returns the ResourceNode that is considered to be <quote>most available</quote> - * within our underlying set of resources for the given JobSpec. - * @param spec The JobSpec to find an available node for. - * @return The {@link ResourceNode} best suited to handle this {@link JobSpec} - * @throws SchedulerException If any error occurs. - */ - ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException; + /** + * Returns the ResourceNode that is considered to be <quote>most available</quote> + * within our underlying set of resources for the given JobSpec. + * + * @param spec The JobSpec to find an available node for. + * @return The {@link ResourceNode} best suited to handle this {@link JobSpec} + * @throws SchedulerException If any error occurs. + */ + ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException; + + /** + * @return The underlying {@link Monitor} used by this + * Scheduler. + */ + Monitor getMonitor(); + + /** + * @return The underlying {@link Batchmgr} used by this + * Scheduler. + */ + Batchmgr getBatchmgr(); + + + /** + * @return The underlying {@link JobQueue} used by this + * Scheduler. + */ + JobQueue getJobQueue(); - /** - * - * @return The underlying {@link Monitor} used by this - * Scheduler. - */ - Monitor getMonitor(); - - /** - * - * @return The underlying {@link Batchmgr} used by this - * Scheduler. - */ - Batchmgr getBatchmgr(); - - - /** - * - * @return The underlying {@link JobQueue} used by this - * Scheduler. - */ - JobQueue getJobQueue(); + /** + * @return The underlying {@link QueueManager} used by this + * Scheduler. + */ + QueueManager getQueueManager(); - /** - * - * @return The underlying {@link QueueManager} used by this - * Scheduler. - */ - QueueManager getQueueManager(); - } http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java index d224cf5..fc3e2ae 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java @@ -22,72 +22,91 @@ import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.Server; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.oodt.cas.resource.scheduler.Scheduler; -import org.apache.oodt.cas.resource.structs.*; -import org.apache.oodt.cas.resource.structs.avrotypes.*; -import org.apache.oodt.cas.resource.structs.exceptions.*; +import org.apache.oodt.cas.resource.structs.AvroTypeFactory; +import org.apache.oodt.cas.resource.structs.Job; +import org.apache.oodt.cas.resource.structs.JobInput; +import org.apache.oodt.cas.resource.structs.JobSpec; +import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.structs.avrotypes.AvroJob; +import org.apache.oodt.cas.resource.structs.avrotypes.AvroJobInput; +import org.apache.oodt.cas.resource.structs.avrotypes.AvroResourceNode; +import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; +import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; +import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; +import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; +import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException; import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; -import org.apache.oodt.cas.resource.util.XmlRpcStructFactory; -import org.apache.xmlrpc.WebServer; - -import java.io.File; -import java.io.FileInputStream; +import org.apache.oodt.cas.resource.util.ResourceNodeComparator; +import org.apache.oodt.config.Component; +import org.apache.oodt.config.ConfigurationManager; +import org.apache.oodt.config.ConfigurationManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; -import java.util.Hashtable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Vector; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager{ +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; - private int port = 2000; +public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager { - private Logger LOG = Logger - .getLogger(XmlRpcResourceManager.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(AvroRpcResourceManager.class); + private int port = 2000; private Server server; + /** our scheduler */ + private Scheduler scheduler; + /** Configuration Manager instance of this instance */ + private ConfigurationManager configurationManager; + private ExecutorService executorService; - /* our scheduler */ - private Scheduler scheduler = null; + public AvroRpcResourceManager(int port) { + this.port = port; - public AvroRpcResourceManager(int port) throws Exception{ - // load properties from workflow manager properties file, if specified + List<String> propertiesFiles = new ArrayList<>(); + // set up the configuration, if there is any if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) { - String configFile = System - .getProperty("org.apache.oodt.cas.resource.properties"); - LOG.log(Level.INFO, - "Loading Resource Manager Configuration Properties from: [" - + configFile + "]"); - System.getProperties().load( - new FileInputStream(new File(configFile))); + propertiesFiles.add(System.getProperty("org.apache.oodt.cas.resource.properties")); } - String schedulerClassStr = System.getProperty( - "resource.scheduler.factory", - "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory"); + configurationManager = ConfigurationManagerFactory + .getConfigurationManager(Component.RESOURCE_MANAGER, propertiesFiles); + } - scheduler = GenericResourceManagerObjectFactory - .getSchedulerServiceFromFactory(schedulerClassStr); + @Override + public void startUp() throws Exception { + try { + configurationManager.loadConfiguration(); + } catch (Exception e) { + logger.error("Unable to load configuration", e); + throw new IOException("Unable to load configuration", e); + } - // start up the scheduler - new Thread(scheduler).start(); + String schedulerClassStr = System.getProperty("resource.scheduler.factory", + "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory"); + scheduler = GenericResourceManagerObjectFactory.getSchedulerServiceFromFactory(schedulerClassStr); - this.port = port; + // start up the scheduler + executorService = Executors.newSingleThreadExecutor(); + executorService.submit(scheduler); // start up the web server - server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class,this), + server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class, this), new InetSocketAddress(this.port)); server.start(); - LOG.log(Level.INFO, "Resource Manager started by " - + System.getProperty("user.name", "unknown")); - + logger.info("Resource Manager started by {}", System.getProperty("user.name", "unknown")); } @Override - public boolean isAlive() throws AvroRemoteException { + public boolean isAlive() { return true; } @@ -95,7 +114,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru public int getJobQueueSize() throws AvroRemoteException { try { return this.scheduler.getJobQueue().getSize(); - }catch (Exception e) { + } catch (Exception e) { throw new AvroRemoteException(new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e)); } } @@ -105,7 +124,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru public int getJobQueueCapacity() throws AvroRemoteException { try { return this.scheduler.getJobQueue().getCapacity(); - }catch (Exception e) { + } catch (Exception e) { throw new AvroRemoteException(new JobRepositoryException("Failed to get capacity of JobQueue : " + e.getMessage(), e)); } } @@ -117,10 +136,11 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru jobId); return scheduler.getJobQueue().getJobRepository().jobFinished(spec); - } catch(JobRepositoryException e ){ + } catch (JobRepositoryException e) { throw new AvroRemoteException(e); } } + @Override public AvroJob getJobInfo(String jobId) throws AvroRemoteException { JobSpec spec = null; @@ -129,15 +149,12 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru spec = scheduler.getJobQueue().getJobRepository() .getJobById(jobId); } catch (JobRepositoryException e) { - LOG.log(Level.WARNING, - "Exception communicating with job repository for job: [" - + jobId + "]: Message: " + e.getMessage()); + logger.warn("Exception communicating with job repository for job: [{}]: Message: {}", jobId, e.getMessage()); throw new AvroRemoteException(new JobRepositoryException("Unable to get job: [" + jobId + "] from repository!")); } return AvroTypeFactory.getAvroJob(spec.getJob()); - } @Override @@ -152,7 +169,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru @Override public boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, String hostUrl) throws AvroRemoteException { try { - return genericHandleJob(exec,in,hostUrl); + return genericHandleJob(exec, in, hostUrl); } catch (JobExecutionException e) { throw new AvroRemoteException(e); } @@ -186,9 +203,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru public boolean killJob(String jobId) throws AvroRemoteException { String resNodeId = scheduler.getBatchmgr().getExecutionNode(jobId); if (resNodeId == null) { - LOG.log(Level.WARNING, "Attempt to kill job: [" + jobId - + "]: cannot find execution node" - + " (has the job already finished?)"); + logger.warn("Attempt to kill job: [{}]: cannot find execution node (has the job already finished?)", jobId); return false; } ResourceNode node = null; @@ -205,14 +220,101 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru public String getExecutionNode(String jobId) throws AvroRemoteException { String execNode = scheduler.getBatchmgr().getExecutionNode(jobId); if (execNode == null) { - LOG.log(Level.WARNING, "Job: [" + jobId - + "] not currently executing on any known node"); + logger.warn("Job: [{}] not currently executing on any known node", jobId); return ""; } else return execNode; } @Override + public String getNodeReport() { + StringBuilder report = new StringBuilder(); + + try { + + // get a sorted list of nodes + List nodes = scheduler.getMonitor().getNodes(); + Collections.sort(nodes, new ResourceNodeComparator()); + + // formulate the report string + for (Object node1 : nodes) { + ResourceNode node = (ResourceNode) node1; + String nodeId = node.getNodeId(); + report.append(nodeId); + report.append(" (").append(getNodeLoad(nodeId)).append("/").append(node.getCapacity()).append(")"); + List<String> nodeQueues = getQueuesWithNode(nodeId); + if (nodeQueues != null && nodeQueues.size() > 0) { + report.append(" -- ").append(nodeQueues.get(0)); + for (int j = 1; j < nodeQueues.size(); j++) { + report.append(", ").append(nodeQueues.get(j)); + } + } + report.append("\n"); + } + } catch (Exception e) { + return null; + } + + return report.toString(); + } + + public List<AvroJob> getQueuedJobs() { + List<AvroJob> jobs = new ArrayList<>(); + List jobSpecs = this.scheduler.getJobQueue().getQueuedJobs(); + + if (jobSpecs != null && jobSpecs.size() > 0) { + for (Object jobSpec : jobSpecs) { + Job job = ((JobSpec) jobSpec).getJob(); + jobs.add(AvroTypeFactory.getAvroJob(job)); + } + } + + return jobs; + } + + @Override + public String getExecReport() { + StringBuilder report = new StringBuilder(); + + try { + + // get a sorted list of all nodes, since the report should be + // alphabetically sorted by node + List resNodes = scheduler.getMonitor().getNodes(); + if (resNodes.size() == 0) { + throw new MonitorException( + "No jobs can be executing, as there are no nodes in the Monitor"); + } + Vector<String> nodeIds = new Vector<String>(); + for (Object resNode : resNodes) { + nodeIds.add(((ResourceNode) resNode).getNodeId()); + } + Collections.sort(nodeIds); + + // generate the report string + for (String nodeId : nodeIds) { + List execJobIds = this.scheduler.getBatchmgr().getJobsOnNode(nodeId); + if (execJobIds != null && execJobIds.size() > 0) { + for (Object execJobId : execJobIds) { + String jobId = (String) execJobId; + Job job = scheduler.getJobQueue().getJobRepository() + .getJobById(jobId).getJob(); + report.append("job id=").append(jobId); + report.append(", load=").append(job.getLoadValue()); + report.append(", node=").append(nodeId); + report.append(", queue=").append(job.getQueueName()).append("\n"); + } + } + } + + } catch (Exception e) { + return null; + } + + return report.toString(); + } + + @Override public List<String> getQueues() throws AvroRemoteException { try { return this.scheduler.getQueueManager().getQueues(); @@ -255,12 +357,12 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru @Override public boolean removeNode(String nodeId) throws AvroRemoteException { - try{ - for(String queueName: this.getQueuesWithNode(nodeId)){ + try { + for (String queueName : this.getQueuesWithNode(nodeId)) { this.removeNodeFromQueue(nodeId, queueName); } this.scheduler.getMonitor().removeNodeById(nodeId); - }catch(Exception e){ + } catch (Exception e) { throw new AvroRemoteException(new MonitorException(e.getMessage(), e)); } @@ -307,13 +409,18 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru } } - public boolean shutdown(){ + @Override + public boolean shutdown() { + configurationManager.clearConfiguration(); + executorService.shutdownNow(); + if (this.server != null) { this.server.close(); this.server = null; return true; - } else + } else { return false; + } } @Override @@ -346,7 +453,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru AvroRpcResourceManager manager = new AvroRpcResourceManager(portNum); - for (;;) + for (; ; ) try { Thread.currentThread().join(); } catch (InterruptedException ignore) { @@ -356,15 +463,13 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru @Override public boolean setNodeCapacity(String nodeId, int capacity) throws AvroRemoteException { - try{ + try { this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity); - }catch (MonitorException e){ - LOG.log(Level.WARNING, "Exception setting capacity on node " - + nodeId + ": " + e.getMessage()); + } catch (MonitorException e) { + logger.warn("Exception setting capacity on node {}: ", nodeId, e.getMessage()); return false; } return true; - } @@ -381,15 +486,14 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru try { jobId = scheduler.getJobQueue().addJob(spec); } catch (JobQueueException e) { - LOG.log(Level.WARNING, "JobQueue exception adding job: Message: " - + e.getMessage()); + logger.warn("JobQueue exception adding job: Message: {}", e.getMessage()); throw new SchedulerException(e.getMessage()); } return jobId; } private boolean genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput, - String urlStr) throws JobExecutionException { + String urlStr) throws JobExecutionException { Job exec = AvroTypeFactory.getJob(avroJob); JobInput in = AvroTypeFactory.getJobInput(avroJobInput); @@ -415,8 +519,7 @@ public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.stru try { url = new URL(urlStr); } catch (MalformedURLException e) { - LOG.log(Level.WARNING, "Error converting string: [" + urlStr - + "] to URL object: Message: " + e.getMessage()); + logger.warn("Error converting string: [{}] to URL object: Message: {}", urlStr, e.getMessage()); } return url; http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java index fa0e84b..4dd0f33 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java @@ -26,11 +26,12 @@ import org.apache.oodt.cas.resource.structs.AvroTypeFactory; import org.apache.oodt.cas.resource.structs.Job; import org.apache.oodt.cas.resource.structs.JobInput; import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager; import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; +import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; -import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager; import java.io.File; import java.io.FileInputStream; @@ -155,9 +156,29 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient { } @Override + public String getNodeReport() throws MonitorException { + try { + return proxy.getNodeReport(); + } catch (AvroRemoteException e) { + LOG.log(Level.SEVERE, "Server error!"); + } + return null; + } + + @Override + public String getExecReport() throws JobRepositoryException { + try { + return proxy.getExecReport(); + } catch (AvroRemoteException e) { + LOG.log(Level.SEVERE, "Server error!"); + } + return null; + } + + @Override public String submitJob(Job exec, JobInput in) throws JobExecutionException { try { - return proxy.handleJob(AvroTypeFactory.getAvroJob(exec),AvroTypeFactory.getAvroJobInput(in)); + return proxy.handleJob(AvroTypeFactory.getAvroJob(exec), AvroTypeFactory.getAvroJobInput(in)); } catch (AvroRemoteException e) { LOG.log(Level.SEVERE, "Server error!"); @@ -243,7 +264,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient { @Override public void setNodeCapacity(String nodeId, int capacity) throws MonitorException { try { - proxy.setNodeCapacity(nodeId,capacity); + proxy.setNodeCapacity(nodeId, capacity); } catch (AvroRemoteException e) { throw new MonitorException(e); } @@ -252,7 +273,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient { @Override public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException { try { - proxy.addNodeToQueue(nodeId,queueName); + proxy.addNodeToQueue(nodeId, queueName); } catch (AvroRemoteException e) { throw new QueueManagerException(e); } @@ -261,7 +282,7 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient { @Override public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException { try { - proxy.removeNodeFromQueue(nodeId,queueName); + proxy.removeNodeFromQueue(nodeId, queueName); } catch (AvroRemoteException e) { throw new QueueManagerException(e); } @@ -302,4 +323,13 @@ public class AvroRpcResourceManagerClient implements ResourceManagerClient { throw new MonitorException(e); } } + + @Override + public List getQueuedJobs() throws JobQueueException { + try { + return proxy.getQueuedJobs(); + } catch (AvroRemoteException e) { + throw new JobQueueException(e); + } + } } http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java index 5cbf6d3..c09b299 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java @@ -17,15 +17,11 @@ package org.apache.oodt.cas.resource.system; -import org.apache.oodt.cas.resource.structs.exceptions.*; - -import java.util.Date; -import java.util.Hashtable; -import java.util.List; -import java.util.Vector; - public interface ResourceManager { - boolean shutdown(); + void startUp() throws Exception; + + boolean isAlive(); + boolean shutdown(); } http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java index dd4444b..d5cacbe 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java @@ -21,6 +21,7 @@ import org.apache.oodt.cas.resource.structs.Job; import org.apache.oodt.cas.resource.structs.JobInput; import org.apache.oodt.cas.resource.structs.ResourceNode; import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; +import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; @@ -29,6 +30,7 @@ import java.net.URL; import java.util.List; public interface ResourceManagerClient { + boolean isJobComplete(String jobId) throws JobRepositoryException; Job getJobInfo(String jobId) throws JobRepositoryException; @@ -43,10 +45,13 @@ public interface ResourceManagerClient { String getExecutionNode(String jobId); + String getNodeReport() throws MonitorException; + + String getExecReport() throws JobRepositoryException; + String submitJob(Job exec, JobInput in) throws JobExecutionException; - boolean submitJob(Job exec, JobInput in, URL hostUrl) - throws JobExecutionException; + boolean submitJob(Job exec, JobInput in, URL hostUrl) throws JobExecutionException; List getNodes() throws MonitorException; @@ -77,4 +82,6 @@ public interface ResourceManagerClient { List<String> getQueuesWithNode(String nodeId) throws QueueManagerException; String getNodeLoad(String nodeId) throws MonitorException; + + List getQueuedJobs() throws JobQueueException; } http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java new file mode 100644 index 0000000..0880da1 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerMain.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; + +public class ResourceManagerMain { + + private static final Logger logger = LoggerFactory.getLogger(ResourceManagerMain.class); + + public static void main(String[] args) throws Exception { + int portNum = -1; + String usage = "AvroRpcResourceManager --portNum <port number for xml rpc service>\n"; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--portNum")) { + portNum = Integer.parseInt(args[++i]); + } + } + + if (portNum == -1) { + System.err.println(usage); + System.exit(1); + } + + String resourceManagerClass = System.getProperty("resmgr.manager", + "org.apache.oodt.cas.resource.system.AvroRpcResourceManager"); + + logger.info("Starting resource manager {} at port: {}", resourceManagerClass, portNum); + + Constructor<?> constructor = Class.forName(resourceManagerClass).getConstructor(Integer.TYPE); + final ResourceManager manager = (ResourceManager) constructor.newInstance(portNum); + manager.startUp(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + manager.shutdown(); + } + }); + + for (; ; ) + try { + Thread.currentThread().join(); + } catch (InterruptedException ignore) { + } + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java index dd698cb..9ee48c8 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java @@ -57,25 +57,24 @@ import java.util.logging.Logger; * <p> * An XML RPC-based Resource manager. * </p> - * + * */ @Deprecated -public class XmlRpcResourceManager { - - /* our log stream */ - private Logger LOG = Logger - .getLogger(XmlRpcResourceManager.class.getName()); - - /* our xml rpc web server */ - private WebServer webServer = null; +public class XmlRpcResourceManager implements ResourceManager{ - /* our scheduler */ - private Scheduler scheduler = null; + /** our log stream */ + private Logger LOG = Logger.getLogger(XmlRpcResourceManager.class.getName()); + private int port; + /** our xml rpc web server */ + private WebServer webServer; + /** our scheduler */ + private Scheduler scheduler; /** Configuration Manager instance of this instance */ private ConfigurationManager configurationManager; public XmlRpcResourceManager(int port) throws IOException { + this.port = port; List<String> propertiesFiles = new ArrayList<>(); // set up the configuration, if there is any if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) { @@ -83,6 +82,10 @@ public class XmlRpcResourceManager { } configurationManager = ConfigurationManagerFactory.getConfigurationManager(Component.RESOURCE_MANAGER, propertiesFiles); + } + + @Override + public void startUp() throws Exception{ try { configurationManager.loadConfiguration(); } catch (Exception e) { @@ -100,8 +103,6 @@ public class XmlRpcResourceManager { // start up the scheduler new Thread(scheduler).start(); - - // start up the web server webServer = new WebServer(port); webServer.addHandler("resourcemgr", this); @@ -109,13 +110,12 @@ public class XmlRpcResourceManager { LOG.log(Level.INFO, "Resource Manager started by " + System.getProperty("user.name", "unknown")); - } public boolean isAlive() { return true; } - + /** * Gets the number of Jobs in JobQueue * @return Number of Jobs in JobQueue @@ -128,7 +128,7 @@ public class XmlRpcResourceManager { throw new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e); } } - + /** * Gets the max number of Jobs allowed in JobQueue * @return Max number of Jobs @@ -283,12 +283,12 @@ public class XmlRpcResourceManager { public List<String> getQueues() { return new Vector<String>(this.scheduler.getQueueManager().getQueues()); } - + public boolean addQueue(String queueName) { this.scheduler.getQueueManager().addQueue(queueName); return true; } - + public boolean removeQueue(String queueName) { this.scheduler.getQueueManager().removeQueue(queueName); return true; @@ -301,7 +301,7 @@ public class XmlRpcResourceManager { this.scheduler.getMonitor().addNode(XmlRpcStructFactory.getResourceNodeFromXmlRpc(hashNode)); return true; } - + public boolean removeNode(String nodeId) throws MonitorException { try{ for(String queueName: this.getQueuesWithNode(nodeId)){ @@ -311,31 +311,31 @@ public class XmlRpcResourceManager { }catch(Exception e){ throw new MonitorException(e.getMessage(), e); } - + return true; } - + public boolean addNodeToQueue(String nodeId, String queueName) throws QueueManagerException { this.scheduler.getQueueManager().addNodeToQueue(nodeId, queueName); return true; } - + public boolean removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException { this.scheduler.getQueueManager().removeNodeFromQueue(nodeId, queueName); return true; } - + public List<String> getNodesInQueue(String queueName) throws QueueManagerException { return new Vector<String>(this.scheduler.getQueueManager().getNodes(queueName)); } - + public List<String> getQueuesWithNode(String nodeId) { return new Vector<String>(this.scheduler.getQueueManager().getQueues(nodeId)); } + @Override public boolean shutdown() { configurationManager.clearConfiguration(); - if (this.webServer != null) { this.webServer.shutdown(); this.webServer = null; @@ -344,37 +344,37 @@ public class XmlRpcResourceManager { return false; } } - + public String getNodeLoad(String nodeId) throws MonitorException{ ResourceNode node = this.scheduler.getMonitor().getNodeById(nodeId); int capacity = node.getCapacity(); int load = (this.scheduler.getMonitor().getLoad(node)) * -1 + capacity; return load + "/" + capacity; } - + public List getQueuedJobs() { Vector jobs = new Vector(); List jobSpecs = this.scheduler.getJobQueue().getQueuedJobs(); - + if(jobSpecs != null && jobSpecs.size() > 0){ for (Object jobSpec : jobSpecs) { Job job = ((JobSpec) jobSpec).getJob(); jobs.add(job); } } - + return XmlRpcStructFactory.getXmlRpcJobList(jobs); } - + public String getNodeReport() throws MonitorException{ StringBuilder report = new StringBuilder(); - + try{ - + // get a sorted list of nodes List nodes = scheduler.getMonitor().getNodes(); Collections.sort(nodes, new ResourceNodeComparator()); - + // formulate the report string for (Object node1 : nodes) { ResourceNode node = (ResourceNode) node1; @@ -390,19 +390,19 @@ public class XmlRpcResourceManager { } report.append("\n"); } - + }catch(Exception e){ throw new MonitorException(e.getMessage(), e); } - + return report.toString(); } - + public String getExecutionReport() throws JobRepositoryException{ StringBuilder report = new StringBuilder(); - + try{ - + // get a sorted list of all nodes, since the report should be // alphabetically sorted by node List resNodes = scheduler.getMonitor().getNodes(); @@ -415,7 +415,7 @@ public class XmlRpcResourceManager { nodeIds.add(((ResourceNode) resNode).getNodeId()); } Collections.sort(nodeIds); - + // generate the report string for(String nodeId: nodeIds){ List execJobIds = this.scheduler.getBatchmgr().getJobsOnNode(nodeId); @@ -431,14 +431,14 @@ public class XmlRpcResourceManager { } } } - + }catch(Exception e){ throw new JobRepositoryException(e.getMessage(), e); } - + return report.toString(); } - + public static void main(String[] args) throws IOException { int portNum = -1; String usage = "XmlRpcResourceManager --portNum <port number for xml rpc service>\n"; @@ -463,7 +463,7 @@ public class XmlRpcResourceManager { } } } - + public boolean setNodeCapacity(String nodeId, int capacity){ try{ this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity); http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java index 1fb4f84..a0ed618 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java @@ -52,7 +52,7 @@ import java.util.logging.Logger; * <p> * The XML RPC based resource manager client. * </p> - * + * */ @Deprecated public class XmlRpcResourceManagerClient implements ResourceManagerClient { @@ -76,7 +76,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { * Constructs a new XmlRpcResourceManagerClient with the given * <code>url</code>. * </p> - * + * * @param url * The url pointer to the xml rpc resource manager service. */ @@ -188,7 +188,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new JobRepositoryException("Failed to get JobQueue from server : " + e.getMessage(), e); } } - + /** * Gets the max number of Jobs allowed in JobQueue * @return Max number of Jobs @@ -203,7 +203,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new JobRepositoryException("Failed to get JobQueue capacity from server : " + e.getMessage(), e); } } - + @Override public boolean killJob(String jobId) { Vector argList = new Vector(); @@ -348,7 +348,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new QueueManagerException(e.getMessage(), e); } } - + /** * Removes the queue with the given name * @param queueName The name of the queue to be removed @@ -364,7 +364,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new QueueManagerException(e.getMessage(), e); } } - + /** * Adds a node * @param node The node to be added @@ -380,7 +380,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new MonitorException(e.getMessage(), e); } } - + /** * Removes the node with the given id * @param nodeId The id of the node to be removed @@ -396,7 +396,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new MonitorException(e.getMessage(), e); } } - + @Override public void setNodeCapacity(String nodeId, int capacity) throws MonitorException{ try{ @@ -408,7 +408,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new MonitorException(e.getMessage(), e); } } - + /** * Addes the node with given id to the queue with the given name * @param nodeId The id of the node to be added to the given queueName @@ -426,7 +426,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new QueueManagerException(e.getMessage(), e); } } - + /** * Remove the node with the given id from the queue with the given name * @param nodeId The id of the node to be remove from the given queueName @@ -444,7 +444,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new QueueManagerException(e.getMessage(), e); } } - + /** * Gets a list of currently supported queue names * @return A list of currently supported queue names @@ -459,7 +459,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new QueueManagerException(e.getMessage(), e); } } - + /** * Gets a list of ids of the nodes in the given queue * @param queueName The name of the queue to get node ids from @@ -476,7 +476,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new QueueManagerException(e.getMessage(), e); } } - + /** * Gets a list of queues which contain the node with the given nodeId * @param nodeId The id of the node to get queues it belongs to @@ -493,7 +493,7 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { throw new QueueManagerException(e.getMessage(), e); } } - + /** * Report on the load of the requested node * @param nodeId The id of the node to be polled @@ -511,42 +511,44 @@ public class XmlRpcResourceManagerClient implements ResourceManagerClient { } } + @Override public List getQueuedJobs() throws JobQueueException{ Vector queuedJobs; - + try{ queuedJobs = (Vector)client.execute("resourcemgr.getQueuedJobs", new Vector<Object>()); }catch(Exception e){ throw new JobQueueException(e.getMessage(), e); } - + return XmlRpcStructFactory.getJobListFromXmlRpc(queuedJobs); - } + } + @Override public String getNodeReport() throws MonitorException{ String report; - + try{ report = (String)client.execute("resourcemgr.getNodeReport", new Vector<Object>()); }catch(Exception e){ throw new MonitorException(e.getMessage(), e); } - + return report; } public String getExecReport() throws JobRepositoryException{ String report; - + try{ report = (String)client.execute("resourcemgr.getExecutionReport", new Vector<Object>()); }catch(Exception e){ throw new JobRepositoryException(e.getMessage(), e); } - + return report; - } + } public static String getReadableJobStatus(String status) { if (status.equals(JobStatus.SUCCESS)) { http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java index b5cf5eb..1033fad 100644 --- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManager.java @@ -17,10 +17,12 @@ package org.apache.oodt.cas.resource.system; -import junit.framework.TestCase; import org.apache.commons.io.FileUtils; import org.apache.oodt.cas.resource.structs.NameValueJobInput; import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.io.File; import java.io.FileFilter; @@ -29,7 +31,11 @@ import java.io.IOException; import java.net.URL; import java.util.Properties; -public class TestAvroRpcResourceManager extends TestCase { +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class TestAvroRpcResourceManager { private File tmpPolicyDir; @@ -37,21 +43,27 @@ public class TestAvroRpcResourceManager extends TestCase { private static final int RM_PORT = 50001; - public void testFake() { - + @Before + public void setUp() throws Exception { + try { + System.out.println(NameValueJobInput.class.getCanonicalName()); + generateTestConfiguration(); + rm = new AvroRpcResourceManager(RM_PORT); + rm.startUp(); + } catch (Exception e) { + e.printStackTrace(); + } } - + /** * @since OODT-182 */ - //Disabled until API impl can be finished - public void XtestDynSetNodeCapacity() { + @Test + public void testDynSetNodeCapacity() { AvroRpcResourceManagerClient rmc = null; try { - rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:" - + RM_PORT)); + rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT)); } catch (Exception e) { - System.out.println("radu1"); e.printStackTrace(); fail(e.getMessage()); } @@ -60,7 +72,6 @@ public class TestAvroRpcResourceManager extends TestCase { try { rmc.setNodeCapacity("localhost", 8); } catch (MonitorException e) { - System.out.println("radu2"); e.printStackTrace(); fail(e.getMessage()); } @@ -68,9 +79,7 @@ public class TestAvroRpcResourceManager extends TestCase { int setCapacity = -1; try { setCapacity = rmc.getNodeById("localhost").getCapacity(); - } catch (Exception e) { - System.out.println("radu3"); e.printStackTrace(); fail(e.getMessage()); } @@ -78,31 +87,11 @@ public class TestAvroRpcResourceManager extends TestCase { } - /* - * (non-Javadoc) - * - * @see junit.framework.TestCase#setUp() - */ - @Override - protected void setUp() throws Exception { - try { - System.out.println(NameValueJobInput.class.getCanonicalName()); - generateTestConfiguration(); - this.rm = new AvroRpcResourceManager(RM_PORT); - } - catch (Exception e){ - e.printStackTrace(); + @After + public void tearDown() { + if (this.rm != null) { + this.rm.shutdown(); } - } - - /* - * (non-Javadoc) - * - * @see junit.framework.TestCase#tearDown() - */ - @Override - protected void tearDown() throws Exception { - if (this.rm != null) this.rm.shutdown(); deleteAllFiles(this.tmpPolicyDir.getAbsolutePath()); } @@ -117,7 +106,6 @@ public class TestAvroRpcResourceManager extends TestCase { } startDirFile.delete(); - } private void generateTestConfiguration() throws IOException { http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java new file mode 100644 index 0000000..d088bab --- /dev/null +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestAvroRpcResourceManagerClient.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system; + +import org.apache.commons.io.FileUtils; +import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; +import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; +import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URL; +import java.util.Hashtable; +import java.util.List; +import java.util.Properties; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.IsNot.not; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Tests for the XmlRpcResourceManagerClient to ensure communications between client and server operate correctly. + */ +public class TestAvroRpcResourceManagerClient { + + private static final int RM_PORT = 50001; + + private static AvroRpcResourceManagerClient rmc; + private static AvroRpcResourceManager rm; + + @BeforeClass + public static void setUp() throws Exception { + generateTestConfiguration(); + rm = new AvroRpcResourceManager(RM_PORT); + rm.startUp(); + rmc = new AvroRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT)); + } + + private static void generateTestConfiguration() throws IOException { + Properties config = new Properties(); + + String propertiesFile = "." + File.separator + "src" + File.separator + + "test" + File.separator + "resources" + File.separator + "test.resource.properties"; + System.getProperties().load(new FileInputStream(new File(propertiesFile))); + + // stage policy + File tmpPolicyDir = null; + try { + tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile(); + } catch (Exception e) { + fail(e.getMessage()); + } + for (File policyFile : new File("./src/test/resources/policy") + .listFiles(new FileFilter() { + + @Override + public boolean accept(File pathname) { + return pathname.isFile() && pathname.getName().endsWith(".xml"); + } + })) { + try { + FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir + .toURI().toString()); + config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs", + tmpPolicyDir.toURI().toString()); + + System.getProperties().putAll(config); + + } + + @Test + public void testGetNodes() throws MonitorException { + List<Hashtable> nodes = rmc.getNodes(); + + assertThat(nodes, is(not(nullValue()))); + assertThat(nodes, hasSize(1)); + + } + + @Test + public void testGetExecutionReport() throws JobRepositoryException { + String execreport = rmc.getExecReport(); + assertThat(execreport, is(not(nullValue()))); + //TODO make it return more than an empty string; + } + + + @Test + public void testJobQueueCapacity() throws JobRepositoryException { + int capacity = rmc.getJobQueueCapacity(); + assertThat(capacity, equalTo(1000)); + } + + @Test + public void testGetJobQueueSize() throws JobRepositoryException { + int size = rmc.getJobQueueSize(); + assertThat(size, equalTo(0)); + //TODO Make it change queue size + } + + @Test + public void testGetNodeById() throws MonitorException { + List<ResourceNode> nodelist = rmc.getNodes(); + + ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId()); + + assertThat(node, is(not(nullValue()))); + + assertThat(node.getNodeId(), equalTo("localhost")); + } + + + @Test + public void testGetNodeLoad() throws MonitorException { + + List<ResourceNode> nodelist = rmc.getNodes(); + + String node = rmc.getNodeLoad(nodelist.get(0).getNodeId()); + + assertNotNull(node); + + assertThat(node, equalTo("0/8")); + + } + + @Test + public void testNodeReport() throws MonitorException { + String report = rmc.getNodeReport(); + + assertThat(report, is(not(nullValue()))); + } + + @Test + public void testGetNodesInQueue() throws QueueManagerException { + List<String> nodes = rmc.getNodesInQueue("long"); + + assertThat(nodes, is(not(nullValue()))); + + assertThat(nodes, hasSize(1)); + + } + + + @Test + public void testQueuedJobs() throws JobQueueException { + List jobs = rmc.getQueuedJobs(); + + assertThat(jobs, is(not(nullValue()))); + + //TODO queue a job + } + + @Test + public void testQueuesWithNode() throws MonitorException, QueueManagerException { + List<ResourceNode> nodelist = rmc.getNodes(); + + + List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId()); + assertThat(queues, hasSize(3)); + + assertThat(queues, containsInAnyOrder("high", "quick", "long")); + } + + @Test + public void testQueues() throws QueueManagerException { + List<String> queues = rmc.getQueues(); + + assertThat(queues, hasSize(3)); + + assertThat(queues, containsInAnyOrder("high", "quick", "long")); + } + + @AfterClass + public static void tearDown() { + rm.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java index db5464b..b9b3860 100644 --- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java @@ -87,11 +87,10 @@ public class TestXmlRpcResourceManager extends TestCase { */ @Override protected void setUp() throws Exception { - System.out.println(NameValueJobInput.class.getCanonicalName()); - generateTestConfiguration(); this.rm = new XmlRpcResourceManager(RM_PORT); + rm.startUp(); } /* http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java index cef79e4..8b1df40 100644 --- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManagerClient.java @@ -19,7 +19,7 @@ import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException; import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; - +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -33,7 +33,11 @@ import java.util.List; import java.util.Properties; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -43,164 +47,166 @@ import static org.junit.Assert.fail; */ public class TestXmlRpcResourceManagerClient { - private static final int RM_PORT = 50001; - - private static XmlRpcResourceManagerClient rmc; - - @BeforeClass - public static void setUp() throws Exception { - generateTestConfiguration(); - XmlRpcResourceManager rm = new XmlRpcResourceManager(RM_PORT); - rmc = new XmlRpcResourceManagerClient(new URL("http://localhost:" +RM_PORT)); + private static final int RM_PORT = 50001; - } + private static ResourceManagerClient rmc; + private static ResourceManager rm; - private static void generateTestConfiguration() throws IOException { - Properties config = new Properties(); - - String propertiesFile = "." + File.separator + "src" + File.separator + - "test" + File.separator + "resources" + File.separator + "test.resource.properties"; - System.getProperties().load(new FileInputStream(new File(propertiesFile))); - - // stage policy - File tmpPolicyDir = null; - try { - tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile(); - } catch (Exception e) { - fail(e.getMessage()); - } - for (File policyFile : new File("./src/test/resources/policy") - .listFiles(new FileFilter() { - - @Override - public boolean accept(File pathname) { - return pathname.isFile() && pathname.getName().endsWith(".xml"); - } - })) { - try { - FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir); - } catch (Exception e) { - fail(e.getMessage()); - } + @BeforeClass + public static void setUp() throws Exception { + generateTestConfiguration(); + rm = new XmlRpcResourceManager(RM_PORT); + rm.startUp(); + rmc = new XmlRpcResourceManagerClient(new URL("http://localhost:" + RM_PORT)); } - config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir - .toURI().toString()); - config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs", - tmpPolicyDir.toURI().toString()); + private static void generateTestConfiguration() throws IOException { + Properties config = new Properties(); + + String propertiesFile = "." + File.separator + "src" + File.separator + + "test" + File.separator + "resources" + File.separator + "test.resource.properties"; + System.getProperties().load(new FileInputStream(new File(propertiesFile))); + + // stage policy + File tmpPolicyDir = null; + try { + tmpPolicyDir = File.createTempFile("test", "ignore").getParentFile(); + } catch (Exception e) { + fail(e.getMessage()); + } + for (File policyFile : new File("./src/test/resources/policy") + .listFiles(new FileFilter() { + + @Override + public boolean accept(File pathname) { + return pathname.isFile() && pathname.getName().endsWith(".xml"); + } + })) { + try { + FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir + .toURI().toString()); + config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs", + tmpPolicyDir.toURI().toString()); + + System.getProperties().putAll(config); - System.getProperties().putAll(config); - - } - - @Test - public void testGetNodes() throws MonitorException { - List<Hashtable> nodes = rmc.getNodes(); - - assertThat(nodes, is(not(nullValue()))); - assertThat(nodes, hasSize(1)); - - } + } - @Test - public void testGetExecutionReport() throws JobRepositoryException { + @Test + public void testGetNodes() throws MonitorException { + List<Hashtable> nodes = rmc.getNodes(); - String execreport = rmc.getExecReport(); + assertThat(nodes, is(not(nullValue()))); + assertThat(nodes, hasSize(1)); + } - assertThat(execreport, is(not(nullValue()))); - //TODO make it return more than an empty string; - } + @Test + public void testGetExecutionReport() throws JobRepositoryException { + String execreport = rmc.getExecReport(); + assertThat(execreport, is(not(nullValue()))); + //TODO make it return more than an empty string; + } - @Test - public void testJobQueueCapacity() throws JobRepositoryException { - int capacity = rmc.getJobQueueCapacity(); - assertThat(capacity, equalTo(1000)); + @Test + public void testJobQueueCapacity() throws JobRepositoryException { + int capacity = rmc.getJobQueueCapacity(); - } + assertThat(capacity, equalTo(1000)); - @Test - public void testGetJobQueueSize() throws JobRepositoryException { - int size = rmc.getJobQueueSize(); + } - assertThat(size, equalTo(0)); + @Test + public void testGetJobQueueSize() throws JobRepositoryException { + int size = rmc.getJobQueueSize(); - //TODO Make it change queue size + assertThat(size, equalTo(0)); - } + //TODO Make it change queue size - @Test - public void testGetNodeById() throws MonitorException { - List<ResourceNode> nodelist = rmc.getNodes(); + } - ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId()); + @Test + public void testGetNodeById() throws MonitorException { + List<ResourceNode> nodelist = rmc.getNodes(); - assertThat(node, is(not(nullValue()))); + ResourceNode node = rmc.getNodeById(nodelist.get(0).getNodeId()); - assertThat(node.getNodeId(), equalTo("localhost")); - } + assertThat(node, is(not(nullValue()))); + assertThat(node.getNodeId(), equalTo("localhost")); + } - @Test - public void testGetNodeLoad() throws MonitorException { - List<ResourceNode> nodelist = rmc.getNodes(); + @Test + public void testGetNodeLoad() throws MonitorException { - String node = rmc.getNodeLoad(nodelist.get(0).getNodeId()); + List<ResourceNode> nodelist = rmc.getNodes(); - assertNotNull(node); + String node = rmc.getNodeLoad(nodelist.get(0).getNodeId()); - assertThat(node, equalTo("0/8")); + assertNotNull(node); - } + assertThat(node, equalTo("0/8")); - @Test - public void testNodeReport() throws MonitorException { - String report = rmc.getNodeReport(); + } - assertThat(report, is(not(nullValue()))); - } + @Test + public void testNodeReport() throws MonitorException { + String report = rmc.getNodeReport(); - @Test - public void testGetNodesInQueue() throws QueueManagerException { - List<String> nodes = rmc.getNodesInQueue("long"); + assertThat(report, is(not(nullValue()))); + } - assertThat(nodes, is(not(nullValue()))); + @Test + public void testGetNodesInQueue() throws QueueManagerException { + List<String> nodes = rmc.getNodesInQueue("long"); - assertThat(nodes, hasSize(1)); + assertThat(nodes, is(not(nullValue()))); - } + assertThat(nodes, hasSize(1)); + } - @Test - public void testQueuedJobs() throws JobQueueException { - List jobs = rmc.getQueuedJobs(); + @Test + public void testQueuedJobs() throws JobQueueException { + List jobs = rmc.getQueuedJobs(); - assertThat(jobs, is(not(nullValue()))); + assertThat(jobs, is(not(nullValue()))); - //TODO queue a job - } + //TODO queue a job + } - @Test - public void testQueuesWithNode() throws MonitorException, QueueManagerException { - List<ResourceNode> nodelist = rmc.getNodes(); + @Test + public void testQueuesWithNode() throws MonitorException, QueueManagerException { + List<ResourceNode> nodelist = rmc.getNodes(); - List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId()); - assertThat(queues, hasSize(3)); + List<String> queues = rmc.getQueuesWithNode(nodelist.get(0).getNodeId()); + assertThat(queues, hasSize(3)); - assertThat(queues, containsInAnyOrder("high", "quick", "long")); - } + assertThat(queues, containsInAnyOrder("high", "quick", "long")); + } - @Test - public void testQueues() throws QueueManagerException { - List<String> queues = rmc.getQueues(); + @Test + public void testQueues() throws QueueManagerException { + List<String> queues = rmc.getQueues(); - assertThat(queues, hasSize(3)); + assertThat(queues, hasSize(3)); - assertThat(queues, containsInAnyOrder("high", "quick", "long")); - } + assertThat(queues, containsInAnyOrder("high", "quick", "long")); + } + @AfterClass + public static void tearDown() { + rm.shutdown(); + } } http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java new file mode 100644 index 0000000..25fd2ee --- /dev/null +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedAvroRpcResourceManager.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system.distributed; + +import org.apache.oodt.cas.resource.system.AvroRpcResourceManager; +import org.apache.oodt.cas.resource.system.ResourceManager; +import org.apache.oodt.cas.resource.system.TestAvroRpcResourceManager; +import org.apache.oodt.config.distributed.cli.ConfigPublisher; +import org.apache.oodt.config.test.AbstractDistributedConfigurationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.oodt.config.Constants.Properties.ENABLE_DISTRIBUTED_CONFIGURATION; +import static org.junit.Assert.fail; + +/** + * Test the operation of Resource Manager under distributed configuration management enabled + * + * @author Imesha Sudasingha + */ +public class TestDistributedAvroRpcResourceManager extends AbstractDistributedConfigurationTest { + + private static final int RM_PORT = 50001; + private static final String CONF_PUBLISHER_XML = "config/distributed/config-publisher.xml"; + + private ResourceManager resourceManager; + + @Before + public void setUpTest() throws Exception { + System.setProperty("org.apache.oodt.cas.cli.action.spring.config", "../config/src/main/resources/cmd-line-actions.xml"); + System.setProperty("org.apache.oodt.cas.cli.option.spring.config", "../config/src/main/resources/cmd-line-options.xml"); + System.setProperty(ENABLE_DISTRIBUTED_CONFIGURATION, "true"); + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "publish" + }); + + try { + resourceManager = new AvroRpcResourceManager(RM_PORT); + resourceManager.startUp(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testDynSetNodeCapacity() { + new TestAvroRpcResourceManager().testDynSetNodeCapacity(); + } + + @After + public void tearDownTest() throws Exception { + if (resourceManager != null) { + resourceManager.shutdown(); + } + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "clear" + }); + + System.clearProperty("org.apache.oodt.cas.cli.action.spring.config"); + System.clearProperty("org.apache.oodt.cas.cli.option.spring.config"); + System.clearProperty(ENABLE_DISTRIBUTED_CONFIGURATION); + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/1b921b5c/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java index 0649b44..16a2b6f 100644 --- a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java @@ -54,6 +54,7 @@ public class TestDistributedXmlRpcResourceManager extends AbstractDistributedCon try { resourceManager = new XmlRpcResourceManager(RM_PORT); + resourceManager.startUp(); } catch (Exception e) { fail(e.getMessage()); }
