Repository: airavata Updated Branches: refs/heads/master 2e8dbe16f -> 58872cec2
fixing monitoring issue - AIRAVATA-1023 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/58872cec Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/58872cec Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/58872cec Branch: refs/heads/master Commit: 58872cec2dcc121995d5db0541251a179d5b019e Parents: 2e8dbe1 Author: lahiru <[email protected]> Authored: Tue Mar 11 11:09:52 2014 -0400 Committer: lahiru <[email protected]> Committed: Tue Mar 11 11:09:52 2014 -0400 ---------------------------------------------------------------------- .../airavata/job/monitor/core/MessageParser.java | 3 ++- .../job/monitor/impl/pull/qstat/QstatMonitor.java | 3 ++- .../job/monitor/impl/push/amqp/AMQPMonitor.java | 17 ++++++++++++----- .../job/monitor/impl/push/amqp/BasicConsumer.java | 7 ++++++- .../monitor/impl/push/amqp/JSONMessageParser.java | 13 ++++++++++++- .../airavata/common/utils/ServerSettings.java | 9 +++++++++ .../main/resources/conf/airavata-server.properties | 3 ++- .../orchestrator/server/OrchestratorServer.java | 4 ++-- .../server/OrchestratorServerHandler.java | 8 ++++++++ .../org/apache/airavata/server/ServerMain.java | 1 + 10 files changed, 56 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java index cd827ca..c70e372 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java @@ -21,6 +21,7 @@ package org.apache.airavata.job.monitor.core; import org.apache.airavata.job.monitor.MonitorID; +import org.apache.airavata.job.monitor.exception.AiravataMonitorException; import org.apache.airavata.job.monitor.state.JobStatus; /** @@ -40,5 +41,5 @@ public interface MessageParser { * @param monitorID monitorID object * @return */ - JobStatus parseMessage(String message,MonitorID monitorID); + JobStatus parseMessage(String message,MonitorID monitorID)throws AiravataMonitorException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java index 1978ad8..5168da0 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java @@ -21,6 +21,7 @@ package org.apache.airavata.job.monitor.impl.pull.qstat; import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.PullMonitor; @@ -69,7 +70,7 @@ public class QstatMonitor extends PullMonitor { monitoring */ this.startPulling = true; - while (this.startPulling) { + while (this.startPulling || !ServerSettings.isStopAllThreads()) { try { startPulling(); // After finishing one iteration of the full queue this thread sleeps 1 second http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java index 0929acb..06d21a1 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java @@ -23,6 +23,7 @@ package org.apache.airavata.job.monitor.impl.push.amqp; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.PushMonitor; import org.apache.airavata.job.monitor.event.MonitorPublisher; @@ -34,10 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.BlockingQueue; /** @@ -125,7 +123,7 @@ public class AMQPMonitor extends PushMonitor { public void run() { // before going to the while true mode we start unregister thread startRegister = true; // this will be unset by someone else - while (startRegister) { + while (startRegister || !ServerSettings.isStopAllThreads()) { try { MonitorID take = runningQueue.take(); this.registerListener(take); @@ -137,6 +135,15 @@ public class AMQPMonitor extends PushMonitor { e.printStackTrace(); } } + Set<String> strings = availableChannels.keySet(); + for(String key:strings) { + Channel channel = availableChannels.get(key); + try { + channel.close(); + } catch (IOException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java index f6704ca..ad25b95 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java @@ -27,6 +27,7 @@ import com.rabbitmq.client.ShutdownSignalException; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.MessageParser; import org.apache.airavata.job.monitor.event.MonitorPublisher; +import org.apache.airavata.job.monitor.exception.AiravataMonitorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,11 @@ public class BasicConsumer implements Consumer { // Here we parse the message and get the job status and push it // to the Event bus, this will be picked by // AiravataJobStatusUpdator and store in to registry - publisher.publish(parser.parseMessage(message,monitorID)); + try { + publisher.publish(parser.parseMessage(message,monitorID)); + } catch (AiravataMonitorException e) { + e.printStackTrace(); + } } public void handleRecoverOk(java.lang.String consumerTag) { http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java index d281c0f..f91176b 100644 --- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java @@ -22,16 +22,27 @@ package org.apache.airavata.job.monitor.impl.push.amqp; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.MessageParser; +import org.apache.airavata.job.monitor.exception.AiravataMonitorException; import org.apache.airavata.job.monitor.state.JobStatus; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + public class JSONMessageParser implements MessageParser { private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class); - public JobStatus parseMessage(String message, MonitorID monitorID) { + public JobStatus parseMessage(String message, MonitorID monitorID)throws AiravataMonitorException{ /*todo write a json message parser here*/ logger.info("Mesage parse invoked"); + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.readTree(message); + } catch (IOException e) { + throw new AiravataMonitorException(e); + } return new JobStatus(); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 7766e8e..8685e35 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -56,6 +56,7 @@ public class ServerSettings extends ApplicationSettings{ private static final String REGISTRY_DB_DRIVER ="registry.jdbc.driver"; private static final String ENABLE_HTTPS = "enable.https"; private static final String HOST_SCHEDULER = "host.scheduler"; + private static boolean stopAllThreads = false; public static String getSystemUser() throws ApplicationSettingsException{ return getSetting(SYSTEM_USER); @@ -171,4 +172,12 @@ public class ServerSettings extends ApplicationSettings{ public static String getHostScheduler() throws ApplicationSettingsException { return getSetting(HOST_SCHEDULER); } + + public static boolean isStopAllThreads() { + return stopAllThreads; + } + + public static void setStopAllThreads(boolean stopAllThreads) { + ServerSettings.stopAllThreads = stopAllThreads; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties index d0c94c9..74220c2 100644 --- a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties +++ b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties @@ -256,7 +256,8 @@ TwoPhase=true ###---------------------------Monitoring module Configurations---------------------------### #This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring #mechanisms and one would be able to start a monitor -monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor +monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor +#,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor #This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java index 5df526b..1f9a15d 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java @@ -27,6 +27,7 @@ import org.apache.airavata.common.utils.IServer.ServerStatus; import org.apache.airavata.orchestrator.cpi.OrchestratorService; import org.apache.airavata.orchestrator.util.Constants; import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TSimpleServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; @@ -44,7 +45,6 @@ public class OrchestratorServer implements IServer{ private TSimpleServer server; - public static final String TESTARGUMENTTOHANDLER = "testing"; public OrchestratorServer() { setStatus(ServerStatus.STOPPED); } @@ -103,7 +103,7 @@ public class OrchestratorServer implements IServer{ @Override public void stop() throws Exception { - if (server!=null && server.isServing()){ + if (server!=null && server.isServing()){ setStatus(ServerStatus.STOPING); server.stop(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 0ac5ed7..9dc561b 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -191,6 +191,14 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { return true; } + public MonitorManager getMonitorManager() { + return monitorManager; + } + + public void setMonitorManager(MonitorManager monitorManager) { + this.monitorManager = monitorManager; + } + @Override public boolean terminateExperiment(String experimentId) throws TException { return false; http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java index 41d2f16..3a25884 100644 --- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java @@ -110,6 +110,7 @@ public class ServerMain { } } if (hasStopRequested()){ + ServerSettings.setStopAllThreads(true); stopAllServers(); System.exit(0); }
