Repository: airavata Updated Branches: refs/heads/master 4ee0852d0 -> 24babee47
fixing issues with job monitor - AIRAVATA-1027 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/24babee4 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/24babee4 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/24babee4 Branch: refs/heads/master Commit: 24babee475123141141d21b98e71da7cdc4bc66b Parents: 4ee0852 Author: lahiru <[email protected]> Authored: Mon Mar 10 14:55:48 2014 -0400 Committer: lahiru <[email protected]> Committed: Mon Mar 10 14:55:48 2014 -0400 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 23 +++--- .../airavata/client/tools/DocumentCreator.java | 1 + .../job/monitor/AiravataJobStatusUpdator.java | 2 +- .../apache/airavata/job/monitor/MonitorID.java | 25 ++++++ .../airavata/job/monitor/MonitorManager.java | 85 ++++++++++++------- .../airavata/job/monitor/core/Monitor.java | 3 +- .../job/monitor/event/MonitorPublisher.java | 1 + .../monitor/impl/pull/qstat/QstatMonitor.java | 24 ++---- .../job/monitor/impl/push/amqp/AMQPMonitor.java | 50 ++++++++--- .../airavata/job/monitor/state/JobStatus.java | 17 +--- .../airavata/job/monitor/AMQPMonitorTest.java | 3 +- .../airavata/job/monitor/QstatMonitorTest.java | 2 +- .../src/test/resources/monitor.properties | 1 + .../main/resources/schemas/HostDescription.xsd | 1 + .../apache/airavata/common/utils/Constants.java | 2 + .../resources/conf/airavata-server.properties | 5 +- .../server/OrchestratorServerHandler.java | 87 +++++++++----------- 17 files changed, 192 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 4f473f1..6c11091 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -61,9 +61,9 @@ public class CreateLaunchExperiment { final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); System.out.println("API version is " + airavata.GetAPIVersion()); addDescriptors(); - final String expId = createExperimentForTrestles(airavata); +// final String expId = createExperimentForTrestles(airavata); // final String expId = createUS3ExperimentForTrestles(airavata); -// final String expId = createExperimentForStampede(airavata); + final String expId = createExperimentForStampede(airavata); // final String expId = createUS3ExperimentForStampede(airavata); System.out.println("Experiment ID : " + expId); launchExperiment(airavata, expId); @@ -109,16 +109,17 @@ public class CreateLaunchExperiment { // Experiment experiment = airavata.getExperiment(expId); // System.out.println("retrieved exp id : " + experiment.getExperimentID()); - } catch (TException e) { + } catch (Exception e) { logger.error("Error while connecting with server", e.getMessage()); e.printStackTrace(); - } catch (ApplicationSettingsException e) { - logger.error("Error while creating airavata API object", e.getMessage()); - e.printStackTrace(); - } catch (AiravataAPIInvocationException e) { - logger.error("Error while creating airavata API object", e.getMessage()); - e.printStackTrace(); } +// } catch (ApplicationSettingsException e) { +// logger.error("Error while creating airavata API object", e.getMessage()); +// e.printStackTrace(); +// } catch (AiravataAPIInvocationException e) { +// logger.error("Error while creating airavata API object", e.getMessage()); +// e.printStackTrace(); +// } } public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException { @@ -129,8 +130,8 @@ public class CreateLaunchExperiment { // documentCreator.createPBSDocs(); // documentCreator.createPBSDocsForOGCE(); // documentCreator.createMPIPBSDocsTrestles(); -// documentCreator.createSlurmDocs(); - documentCreator.createMPIPBSDocsStampede(); + documentCreator.createSlurmDocs(); +// documentCreator.createMPIPBSDocsStampede(); } catch (AiravataAPIInvocationException e) { logger.error("Unable to create airavata API", e.getMessage()); throw new AiravataAPIInvocationException(e); http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java ---------------------------------------------------------------------- diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java index f6b953e..eb3a0ca 100644 --- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java +++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java @@ -638,6 +638,7 @@ public class DocumentCreator { ((GsisshHostType) host.getType()).setJobManager("slurm"); ((GsisshHostType) host.getType()).setInstalledPath("/usr/bin/"); ((GsisshHostType) host.getType()).setPort(2222); +// ((GsisshHostType) host.getType()).setMo(2222); try { http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java index 7407a10..6dc007c 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java @@ -81,7 +81,7 @@ public class AiravataJobStatusUpdator{ break; case UNKNOWN: logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN"); - logger.info("Unknown job status came, if the old job status is RUNNING or something active, we have to make it complete"); + jobsToMonitor.remove(jobStatus.getMonitorID()); //todo implement this logic break; case QUEUED: http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java index 100c141..945362b 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java @@ -23,6 +23,9 @@ package org.apache.airavata.job.monitor; import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.apache.airavata.job.monitor.state.JobStatus; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.omg.PortableInterceptor.ACTIVE; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +62,7 @@ public class MonitorID { private int failedCount = 0; + private JobState state; public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) { this.host = host; @@ -171,4 +175,25 @@ public class MonitorID { public void setFailedCount(int failedCount) { this.failedCount = failedCount; } + + public JobState getStatus() { + return state; + } + + public void setStatus(JobState status) { + if (this.state != null && status.equals(JobState.UNKNOWN)) { + switch (this.state) { + case ACTIVE: + this.state = JobState.COMPLETE; + break; + case QUEUED: + this.state = JobState.COMPLETE; + break; + + } + }else{ + // normal scenario + this.state = status; + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java index aad7d41..a0927a5 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java @@ -21,6 +21,7 @@ package org.apache.airavata.job.monitor; import com.google.common.eventbus.EventBus; +import org.apache.airavata.common.utils.Constants; import org.apache.airavata.job.monitor.core.PullMonitor; import org.apache.airavata.job.monitor.core.PushMonitor; import org.apache.airavata.job.monitor.event.MonitorPublisher; @@ -29,9 +30,11 @@ import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor; import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor; import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread; import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl; +import org.apache.airavata.schemas.gfac.GsisshHostType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.validation.constraints.Null; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -50,7 +53,11 @@ public class MonitorManager { private List<PushMonitor> pushMonitors; //todo we need to support multiple monitors dynamically - private BlockingQueue<MonitorID> runningQueue; + private BlockingQueue<MonitorID> pullQueue; + + private BlockingQueue<MonitorID> pushQueue; + + private BlockingQueue<MonitorID> localJobQueue; private BlockingQueue<MonitorID> finishQueue; @@ -62,7 +69,8 @@ public class MonitorManager { public MonitorManager() { pullMonitors = new ArrayList<PullMonitor>(); pushMonitors = new ArrayList<PushMonitor>(); - runningQueue = new LinkedBlockingDeque<MonitorID>(); + pullQueue = new LinkedBlockingDeque<MonitorID>(); + pushQueue = new LinkedBlockingDeque<MonitorID>(); finishQueue = new LinkedBlockingDeque<MonitorID>(); monitorPublisher = new MonitorPublisher(new EventBus()); registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue)); @@ -77,7 +85,7 @@ public class MonitorManager { public void addAMQPMonitor(AMQPMonitor monitor) { monitor.setPublisher(this.getMonitorPublisher()); monitor.setFinishQueue(this.getFinishQueue()); - monitor.setRunningQueue(this.getRunningQueue()); + monitor.setRunningQueue(this.getPushQueue()); addPushMonitor(monitor); } @@ -89,7 +97,7 @@ public class MonitorManager { */ public void addQstatMonitor(QstatMonitor qstatMonitor) { qstatMonitor.setPublisher(this.getMonitorPublisher()); - qstatMonitor.setQueue(this.getRunningQueue()); + qstatMonitor.setQueue(this.getPullQueue()); addPullMonitor(qstatMonitor); } @@ -127,13 +135,19 @@ public class MonitorManager { * This is going to be useful during the startup of the launching process * * @param monitorID + * @throws AiravataMonitorException */ public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException { - try { - runningQueue.put(monitorID); - } catch (InterruptedException e) { - String error = "Error while putting the job: " + monitorID.getJobID() + " the monitor queue"; - throw new AiravataMonitorException(error, e); + if (monitorID.getHost().getType() instanceof GsisshHostType) { + GsisshHostType host = (GsisshHostType) monitorID.getHost().getType(); + if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null + || Constants.PULL.equals(host.getMonitorMode())) { + pullQueue.add(monitorID); + } else if (Constants.PUSH.equals(host.getMonitorMode())) { + pushQueue.add(monitorID); + } + } else { + logger.error("We only support Gsissh host types currently"); } } @@ -148,26 +162,19 @@ public class MonitorManager { * @throws AiravataMonitorException */ public void launchMonitor() throws AiravataMonitorException { - if (pushMonitors.isEmpty()) { - if (pullMonitors.isEmpty()) { - logger.error("Before launching MonitorManager should have atleast one Monitor"); - return; - } else { - //no push monitor is configured so we launch pull monitor - QstatMonitor pullMonitor = (QstatMonitor)pullMonitors.get(0); - (new Thread(pullMonitor)).start(); - } - } else { - // there is a push monitor configured, so we schedule the push monitor - // We currently support dealing with one type of monitor - AMQPMonitor pushMonitor = (AMQPMonitor) pushMonitors.get(0); - (new Thread(pushMonitor)).start(); - - UnRegisterThread unRegisterThread = new - UnRegisterThread(pushMonitor.getFinishQueue(), pushMonitor.getAvailableChannels()); - unRegisterThread.start(); + //no push monitor is configured so we launch pull monitor + for (PullMonitor monitor : pullMonitors) { + (new Thread(monitor)).start(); } + for (PushMonitor monitor : pushMonitors) { + (new Thread(monitor)).start(); + if (monitor instanceof AMQPMonitor) { + UnRegisterThread unRegisterThread = new + UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels()); + unRegisterThread.start(); + } + } } /* getter setters for the private variables */ @@ -188,12 +195,12 @@ public class MonitorManager { this.pushMonitors = pushMonitors; } - public BlockingQueue<MonitorID> getRunningQueue() { - return runningQueue; + public BlockingQueue<MonitorID> getPullQueue() { + return pullQueue; } - public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) { - this.runningQueue = runningQueue; + public void setPullQueue(BlockingQueue<MonitorID> pullQueue) { + this.pullQueue = pullQueue; } public MonitorPublisher getMonitorPublisher() { @@ -211,4 +218,20 @@ public class MonitorManager { public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) { this.finishQueue = finishQueue; } + + public BlockingQueue<MonitorID> getPushQueue() { + return pushQueue; + } + + public void setPushQueue(BlockingQueue<MonitorID> pushQueue) { + this.pushQueue = pushQueue; + } + + public BlockingQueue<MonitorID> getLocalJobQueue() { + return localJobQueue; + } + + public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) { + this.localJobQueue = localJobQueue; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java index ce8cf22..9627bbc 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java @@ -20,12 +20,11 @@ */ package org.apache.airavata.job.monitor.core; -import org.apache.airavata.job.monitor.event.MonitorPublisher; /** * This is the primary interface for Monitors, * This can be used to implement different methods of monitoring */ -public interface Monitor { +public interface Monitor extends Runnable { } http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java index 12c27fa..95b64ab 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java @@ -21,6 +21,7 @@ package org.apache.airavata.job.monitor.event; import com.google.common.eventbus.EventBus; +import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.state.JobStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java index a2c85ed..1978ad8 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java @@ -20,7 +20,7 @@ */ package org.apache.airavata.job.monitor.impl.pull.qstat; -import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.common.utils.Constants; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.PullMonitor; @@ -29,7 +29,6 @@ import org.apache.airavata.job.monitor.exception.AiravataMonitorException; import org.apache.airavata.job.monitor.state.JobStatus; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.schemas.gfac.GsisshHostType; -import org.apache.airavata.schemas.gfac.HostDescriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +43,7 @@ import java.util.concurrent.BlockingQueue; * This monitor is based on qstat command which can be run * in grid resources and retrieve the job status. */ -public class QstatMonitor extends PullMonitor implements Runnable { +public class QstatMonitor extends PullMonitor { private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class); // I think this should use DelayedBlocking Queue to do the monitoring*/ @@ -95,16 +94,9 @@ public class QstatMonitor extends PullMonitor implements Runnable { // at the tail of the queue MonitorID take = null; JobStatus jobStatus = new JobStatus(); - while (!this.queue.isEmpty()) { - try { - Iterator<MonitorID> iterator = this.queue.iterator(); - // no need to check iterator.hasNext because its already checked - MonitorID next = iterator.next(); - // we check whether the job is type of gsissh otherwise we return the job back to the queue - // Here we use iterator because it not fair to take the object from the queue unless its - // the correct host type,so if its not the right type it will remain in the queue - if(next.getHost().getType() instanceof GsisshHostType){ - take = this.queue.take(); + try { + take = this.queue.take(); + if((take.getHost().getType() instanceof GsisshHostType)){ long monitorDiff = 0; long startedDiff = 0; if (take.getLastMonitored() != null) { @@ -129,8 +121,10 @@ public class QstatMonitor extends PullMonitor implements Runnable { connection = new ResourceConnection(take, gsisshHostType.getInstalledPath()); connections.put(hostName, connection); } + take.setStatus(connection.getJobStatus(take)); jobStatus.setMonitorID(take); - jobStatus.setState(connection.getJobStatus(take)); + jobStatus.setState(take.getStatus()); + // we have this JobStatus class to handle amqp monitoring publisher.publish(jobStatus); // if the job is completed we do not have to put the job to the queue again if (!jobStatus.getState().equals(JobState.COMPLETE)) { @@ -186,7 +180,7 @@ public class QstatMonitor extends PullMonitor implements Runnable { } throw new AiravataMonitorException("Error retrieving the job status", e); } - } + return true; http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java index 6bf5f01..b5b6e8f 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java @@ -22,17 +22,21 @@ package org.apache.airavata.job.monitor.impl.push.amqp; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import org.apache.airavata.common.utils.Constants; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.PushMonitor; import org.apache.airavata.job.monitor.event.MonitorPublisher; import org.apache.airavata.job.monitor.exception.AiravataMonitorException; import org.apache.airavata.job.monitor.util.AMQPConnectionUtil; import org.apache.airavata.job.monitor.util.CommonUtils; +import org.apache.airavata.schemas.gfac.GsisshHostType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -41,7 +45,7 @@ import java.util.concurrent.BlockingQueue; * rabbitmq client to recieve AMQP based monitoring data from * mostly excede resources. */ -public class AMQPMonitor extends PushMonitor implements Runnable { +public class AMQPMonitor extends PushMonitor { private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class); @@ -57,24 +61,36 @@ public class AMQPMonitor extends PushMonitor implements Runnable { private BlockingQueue<MonitorID> finishQueue; + private String connectionName; + + private String proxyPath; + + private List<String> amqpHosts; + public AMQPMonitor(){ } - public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue) { + public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue, + String proxyPath,String connectionName,List<String> hosts) { this.publisher = publisher; - this.runningQueue = runningQueue; - this.finishQueue = finishQueue; + this.runningQueue = runningQueue; // these will be initialized by the MonitorManager + this.finishQueue = finishQueue; // these will be initialized by the MonitorManager availableChannels = new HashMap<String, Channel>(); -// UnRegisterThread unRegisterThread = new UnRegisterThread(finishQueue,availableChannels); -// unRegisterThread.run(); - System.out.println("Testing"); + this.connectionName = connectionName; + this.proxyPath = proxyPath; + this.amqpHosts = hosts; + } + + public void initialize(String proxyPath,String connectionName,List<String> hosts){ + this.connectionName = connectionName; + this.proxyPath = proxyPath; + this.amqpHosts = hosts; } public void run() { try { // before going to the while true mode we start unregister thread while (true) { - // we got a new job to do the monitoring MonitorID take = runningQueue.take(); this.registerListener(take); } @@ -99,7 +115,7 @@ public class AMQPMonitor extends PushMonitor implements Runnable { // if we already have a channel we do not create one if (availableChannels.get(channelID) == null) { //todo need to fix this rather getting it from a file - Connection connection = AMQPConnectionUtil.connect("xsede_private", "/Users/lahirugunathilake/Downloads/x509up_u503876"); + Connection connection = AMQPConnectionUtil.connect(connectionName, proxyPath); Channel channel = null; try { channel = connection.createChannel(); @@ -185,9 +201,19 @@ public class AMQPMonitor extends PushMonitor implements Runnable { this.finishQueue = finishQueue; } - /** - * implementing a logic to handle the finished job and unsubscribe - */ + public String getProxyPath() { + return proxyPath; + } + + public void setProxyPath(String proxyPath) { + this.proxyPath = proxyPath; + } + public List<String> getAmqpHosts() { + return amqpHosts; + } + public void setAmqpHosts(List<String> amqpHosts) { + this.amqpHosts = amqpHosts; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java index 9ee6ce8..fe623fb 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java @@ -43,22 +43,7 @@ public class JobStatus { } public void setState(JobState state) { - // this is to handle quick change of status and getStatus returns unknown values - // because job is already finished and information is removed in the resource - if (this.state != null && state.equals(JobState.UNKNOWN)) { - switch (this.state) { - case ACTIVE: - this.state = JobState.COMPLETE; - break; - case QUEUED: - this.state = JobState.COMPLETE; - break; - - } - }else{ - // normal scenario - this.state = state; - } + this.state = state; } public MonitorID getMonitorID() { http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java index a6ef7ea..6327050 100644 --- a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java +++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java @@ -31,7 +31,6 @@ import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo import org.apache.airavata.job.monitor.exception.AiravataMonitorException; import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor; import org.apache.airavata.schemas.gfac.GsisshHostType; -import org.apache.axiom.om.util.CommonUtils; import org.junit.Before; import org.junit.Test; @@ -69,7 +68,7 @@ public class AMQPMonitorTest { monitorManager = new MonitorManager(); AMQPMonitor amqpMonitor = new AMQPMonitor(monitorManager.getMonitorPublisher(), - monitorManager.getRunningQueue(), monitorManager.getFinishQueue()); + monitorManager.getPullQueue(), monitorManager.getFinishQueue()); try { monitorManager.addPushMonitor(amqpMonitor); monitorManager.launchMonitor(); http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java index aabc0d7..126b8ae 100644 --- a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java +++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java @@ -68,7 +68,7 @@ public class QstatMonitorTest { monitorManager = new MonitorManager(); QstatMonitor qstatMonitor = new - QstatMonitor(monitorManager.getRunningQueue(), monitorManager.getMonitorPublisher()); + QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher()); try { monitorManager.addPullMonitor(qstatMonitor); monitorManager.launchMonitor(); http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/resources/monitor.properties ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/test/resources/monitor.properties b/modules/airavata-job-monitor/src/test/resources/monitor.properties index 0b0b5f4..a4d68cf 100644 --- a/modules/airavata-job-monitor/src/test/resources/monitor.properties +++ b/modules/airavata-job-monitor/src/test/resources/monitor.properties @@ -1,2 +1,3 @@ amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org +proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 connection.name=xsede_private \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd ---------------------------------------------------------------------- diff --git a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd index 4ec3a4a..c4052d9 100644 --- a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd +++ b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd @@ -112,6 +112,7 @@ <element name="postJobCommands" type="xsd:string" minOccurs="0" maxOccurs="unbounded"/> <element name="installedPath" type="xsd:string" minOccurs="0" maxOccurs="1" default="/opt/torque/bin"/> <element name="jobManager" type="xsd:string" minOccurs="0" maxOccurs="1"/> + <element name="monitorMode" type="xsd:string" minOccurs="0" maxOccurs="1"/> </sequence> </extension> </complexContent> http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java index effc4bb..bac5913 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java @@ -29,4 +29,6 @@ public final class Constants { public static final String USER_IN_SESSION = "userName"; public static final String GATEWAY_NAME = "gateway_id"; public static final String GFAC_CONFIG_XML = "gfac-config.xml"; + public static final String PUSH = "push"; + public static final String PULL = "pull"; } http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties index e0c4b4b..d0c94c9 100644 --- a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties +++ b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties @@ -256,11 +256,10 @@ TwoPhase=true ###---------------------------Monitoring module Configurations---------------------------### #This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring #mechanisms and one would be able to start a monitor -primaryMonitor=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor -#We do not support a secondaray monitoring at this point or host specific monitoring -secondaryMonitor=org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor +monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor #This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org +proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 connection.name=xsede_private http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index defbdcf..0ac4756 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -44,13 +44,12 @@ import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.DataType; import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URL; +import java.lang.String; +import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -63,14 +62,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private Registry registry; - private boolean pushMode = true; - GSIAuthenticationInfo authenticationInfo = null; /** * Query orchestrator server to fetch the CPI version */ - @Override public String getOrchestratorCPIVersion() throws TException { return orchestrator_cpi_serviceConstants.ORCHESTRATOR_CPI_VERSION; @@ -95,34 +91,33 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { 7512, 17280000, certPath); // loading Monitor configuration - String primaryMonitor = properties.getProperty("primaryMonitor"); - String secondaryMonitor = properties.getProperty("secondaryMonitor"); - + String monitors = properties.getProperty("monitors"); + List<String> monitorList = Arrays.asList(monitors.split(",")); + List<String> list = Arrays.asList(properties.getProperty("amqp.hosts").split(",")); + String proxyPath = properties.getProperty("proxy.file.path"); + String connectionName = properties.getProperty("connection.name"); - if (primaryMonitor == null) { + if (monitors == null) { log.error("Error loading primaryMonitor and there has to be a primary monitor"); } else { - Class<? extends Monitor> aClass = Class.forName(primaryMonitor).asSubclass(Monitor.class); - Monitor monitor = aClass.newInstance(); - if (monitor instanceof PullMonitor) { - if(monitor instanceof QstatMonitor){ - monitorManager.addQstatMonitor((QstatMonitor)monitor); + for (String monitorClass : monitorList) { + Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class); + Monitor monitor = aClass.newInstance(); + if (monitor instanceof PullMonitor) { + if (monitor instanceof QstatMonitor) { + monitorManager.addQstatMonitor((QstatMonitor) monitor); + } + } else if (monitor instanceof PushMonitor) { + if (monitor instanceof AMQPMonitor) { + ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list); + monitorManager.addAMQPMonitor((AMQPMonitor) monitor); + } + } else { + log.error("Wrong class is given to primary Monitor"); } - pushMode = false; - } else if (monitor instanceof PushMonitor) { - if(monitor instanceof AMQPMonitor){ - monitorManager.addAMQPMonitor((AMQPMonitor)monitor); - } - } else { - log.error("Wrong class is given to primary Monitor"); } - } - if (secondaryMonitor == null) { - log.info("No secondary Monitor has configured !!!!"); - } else { - // todo we do not support a secondary Monitor at this point - } + } monitorManager.registerListener(orchestrator); // Now Monitor Manager is properly configured, now we have to start the monitoring system. // This will initialize all the required threads and required queues @@ -152,20 +147,19 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { * * @param experimentId */ - @Override public boolean launchExperiment(String experimentId) throws TException { //TODO: Write the Orchestrator implementaion try { List<TaskDetails> tasks = orchestrator.createTasks(experimentId); MonitorID monitorID = null; - if(tasks.size() > 1){ + if (tasks.size() > 1) { log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs"); } - for(TaskDetails taskID:tasks) { + for (TaskDetails taskID : tasks) { //iterate through all the generated tasks and performs the job submisssion+monitoring Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId); - if(experiment == null){ + if (experiment == null) { log.error("Error retrieving the Experiment by the given experimentID: " + experimentId); return false; } @@ -174,27 +168,28 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID); // creating monitorID to register with monitoring queue + // this is a special case because amqp has to be in place before submitting the job + if ((hostDescription instanceof GsisshHostType) && + Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) { - if(pushMode){ - // during the pull we need the monitorID in the queue inadvance - // For this we have enough data at this point - monitorID = new MonitorID(hostDescription, null,taskID.getTaskID(),experimentId, userName); + monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), experimentId, userName); monitorManager.addAJobToMonitor(monitorID); - } - // Launching job for each task - String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID()); - log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); - // if the monitoring is pull mode then we add the monitorID for each task after submitting - // the job with the jobID, otherwise we don't need the jobID - if(!pushMode) { - monitorID = new MonitorID(hostDescription, jobID,taskID.getTaskID(),experimentId, userName, authenticationInfo); + String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID()); + log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); + } else { + // Launching job for each task + // if the monitoring is pull mode then we add the monitorID for each task after submitting + // the job with the jobID, otherwise we don't need the jobID + String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID()); + log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); + monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), experimentId, userName, authenticationInfo); monitorManager.addAJobToMonitor(monitorID); } } } catch (Exception e) { throw new TException(e); } - return false; + return true; } @Override
