Repository: airavata Updated Branches: refs/heads/master 56574dd19 -> d4e398545
https://issues.apache.org/jira/browse/AIRAVATA-1145 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/20306cd0 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/20306cd0 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/20306cd0 Branch: refs/heads/master Commit: 20306cd044679cf5d0f61e4832eae02a2ae69e83 Parents: 56574dd Author: Saminda Wijeratne <[email protected]> Authored: Fri Apr 18 04:14:02 2014 -0700 Committer: Saminda Wijeratne <[email protected]> Committed: Fri Apr 18 04:14:02 2014 -0700 ---------------------------------------------------------------------- .../main/resources/airavata-server.properties | 2 +- .../job/monitor/AbstractActivityListener.java | 27 +++++++++++++ .../monitor/AbstractActivityMonitorClient.java | 27 ------------- .../job/monitor/AiravataJobStatusUpdator.java | 42 ++++++++++++++++++-- .../airavata/job/monitor/MonitorManager.java | 42 ++++++++++---------- .../SingleAppIntegrationTestBase.java | 1 + .../orchestrator/server/OrchestratorServer.java | 3 -- .../apache/airavata/job/monitor/MonitorID.java | 11 ++--- .../job/monitor/event/MonitorPublisher.java | 17 ++++++-- 9 files changed, 107 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index 193e9c8..13f78d5 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -262,7 +262,7 @@ monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 connection.name=xsede -activity.monitors=org.apache.airavata.job.monitor.AiravataJobStatusUpdator +activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator ###---------------------------Orchestrator module Configurations---------------------------### job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java new file mode 100644 index 0000000..49927e6 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.job.monitor; + + +public interface AbstractActivityListener { + public void setup(Object...configurations); +} http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java deleted file mode 100644 index 9124cc3..0000000 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.job.monitor; - - -public interface AbstractActivityMonitorClient { - public void setup(Object...configurations); -} http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java index 37045a8..ec03d71 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java @@ -23,9 +23,12 @@ package org.apache.airavata.job.monitor; import java.util.Calendar; import java.util.concurrent.BlockingQueue; +import org.apache.airavata.job.monitor.event.MonitorPublisher; import org.apache.airavata.job.monitor.state.JobStatus; +import org.apache.airavata.job.monitor.state.TaskStatus; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.registry.cpi.CompositeIdentifier; import org.apache.airavata.registry.cpi.DataType; import org.apache.airavata.registry.cpi.Registry; @@ -34,11 +37,13 @@ import org.slf4j.LoggerFactory; import com.google.common.eventbus.Subscribe; -public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{ +public class AiravataJobStatusUpdator implements AbstractActivityListener{ private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class); private Registry airavataRegistry; + private MonitorPublisher monitorPublisher; + private BlockingQueue<MonitorID> jobsToMonitor; public Registry getAiravataRegistry() { @@ -105,9 +110,39 @@ public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{ logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED"); jobsToMonitor.remove(jobStatus.getMonitorID()); break; + default: + break; } } } + + @Subscribe + public void setupTaskStatus(JobStatus jobStatus){ + TaskState state=TaskState.UNKNOWN; + switch(jobStatus.getState()){ + case ACTIVE: + state=TaskState.EXECUTING; break; + case CANCELED: + state=TaskState.CANCELED; break; + case COMPLETE: + state=TaskState.COMPLETED; break; + case FAILED: + state=TaskState.FAILED; break; + case HELD: case SUSPENDED: case QUEUED: + state=TaskState.WAITING; break; + case SETUP: + state=TaskState.PRE_PROCESSING; break; + case SUBMITTED: + state=TaskState.STARTED; break; + case UN_SUBMITTED: + state=TaskState.CANCELED; break; + default: + break; + } + logger.debug("Publishing Task Status "+state.toString()); + monitorPublisher.publish(new TaskStatus(jobStatus.getMonitorID(),state)); + } + public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception { CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID); JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids); @@ -126,12 +161,13 @@ public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{ @Override public void setup(Object... configurations) { for (Object configuration : configurations) { - if (configuration instanceof Registry){ this.airavataRegistry=(Registry)configuration; } else if (configuration instanceof BlockingQueue<?>){ this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration; - } + } else if (configuration instanceof MonitorPublisher){ + this.monitorPublisher=(MonitorPublisher) configuration; + } } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java index 1929057..ed89230 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java @@ -55,6 +55,8 @@ remove them from the queue, this will be done by AiravataJobUpdator. */ public class MonitorManager { private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class); + + private final static String ACTIVITY_LISTENERS = "activity.listeners"; private List<PullMonitor> pullMonitors; //todo though we have a List we only support one at a time @@ -72,7 +74,7 @@ public class MonitorManager { private Monitor localJobMonitor; - private List<AbstractActivityMonitorClient> activityMonitors; + private List<AbstractActivityListener> activityListeners; private Registry registry; @@ -97,27 +99,27 @@ public class MonitorManager { private void loadActivityMonitors(){ try { - activityMonitors=new ArrayList<AbstractActivityMonitorClient>(); - String activityMonitorsString = ServerSettings.getSetting("activity.monitors"); - if (activityMonitorsString!=null){ - String[] activityMonitorClasses = activityMonitorsString.split(","); - for (String activityMonitorClassName : activityMonitorClasses) { - Class<?> classInstance; + activityListeners=new ArrayList<AbstractActivityListener>(); + String activityListenersString = ServerSettings.getSetting(ACTIVITY_LISTENERS); + if (activityListenersString!=null){ + String[] activityListenerClasses = activityListenersString.split(","); + for (String activityListenerClassName : activityListenerClasses) { try { - classInstance = MonitorManager.class - .getClassLoader().loadClass(activityMonitorClassName); - AbstractActivityMonitorClient monitor=(AbstractActivityMonitorClient)classInstance.newInstance(); - monitor.setup(registry,getFinishQueue()); - activityMonitors.add(monitor); + activityListenerClassName=activityListenerClassName.trim(); + Class<?> classInstance = MonitorManager.class + .getClassLoader().loadClass(activityListenerClassName); + AbstractActivityListener monitor=(AbstractActivityListener)classInstance.newInstance(); + monitor.setup(registry, getFinishQueue(), getMonitorPublisher()); + activityListeners.add(monitor); registerListener(monitor); } catch (ClassNotFoundException e) { - logger.error("Error while locating activity monitor implementation \""+activityMonitorClassName+"\"!!!",e); + logger.error("Error while locating activity monitor implementation \""+activityListenerClassName+"\"!!!",e); } catch (InstantiationException e) { - logger.error("Error while initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e); + logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e); } catch (IllegalAccessException e) { - logger.error("Error while initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e); + logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e); } catch (ClassCastException e){ - logger.error("Invalid activity monitor \""+activityMonitorClassName+"\"!!!",e); + logger.error("Invalid activity monitor \""+activityListenerClassName+"\"!!!",e); } } } @@ -234,7 +236,6 @@ public class MonitorManager { */ public void launchMonitor() throws AiravataMonitorException { //no push monitor is configured so we launch pull monitor - int index = 0; if(localJobMonitor != null){ (new Thread(localJobMonitor)).start(); } @@ -261,18 +262,17 @@ public class MonitorManager { */ public void stopMonitor() throws AiravataMonitorException { //no push monitor is configured so we launch pull monitor - int index = 0; if(localJobMonitor != null){ - (new Thread(localJobMonitor)).stop(); + (new Thread(localJobMonitor)).interrupt(); } for (PullMonitor monitor : pullMonitors) { - (new Thread(monitor)).stop(); + (new Thread(monitor)).interrupt(); } //todo fix this for (PushMonitor monitor : pushMonitors) { - (new Thread(monitor)).stop(); + (new Thread(monitor)).interrupt(); } } /* getter setters for the private variables */ http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java b/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java index 11e6a77..6592bf8 100644 --- a/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java +++ b/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java @@ -136,6 +136,7 @@ public class SingleAppIntegrationTestBase { Map<String, JobStatus> jobStatuses = null; while (true) { try { + System.out.println("*********Experiment status*** : "+client.getExperimentStatus(expId)); jobStatuses = client.getJobStatuses(expId); Set<String> strings = jobStatuses.keySet(); for (String key : strings) { http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java index 115ea3f..d6ff5c3 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java @@ -23,12 +23,9 @@ package org.apache.airavata.orchestrator.server; import org.apache.airavata.common.utils.IServer; import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.IServer.ServerStatus; import org.apache.airavata.orchestrator.cpi.OrchestratorService; import org.apache.airavata.orchestrator.util.Constants; import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TServerEventHandler; -import org.apache.thrift.server.TSimpleServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java index f65241a..718177c 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java @@ -20,20 +20,17 @@ */ package org.apache.airavata.job.monitor; +import java.sql.Timestamp; +import java.util.Date; +import java.util.Map; + import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; -import org.apache.airavata.job.monitor.state.JobStatus; import org.apache.airavata.model.workspace.experiment.JobState; -import org.omg.PortableInterceptor.ACTIVE; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Timestamp; -import java.util.Date; -import java.util.Map; -import java.util.Properties; - /* This is the object which contains the data to identify a particular Job to start the monitoring http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java index 0f75206..cc85e58 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java @@ -20,12 +20,15 @@ */ package org.apache.airavata.job.monitor.event; -import com.google.common.eventbus.EventBus; import org.apache.airavata.job.monitor.MonitorID; +import org.apache.airavata.job.monitor.state.ExperimentStatus; import org.apache.airavata.job.monitor.state.JobStatus; +import org.apache.airavata.job.monitor.state.TaskStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.eventbus.EventBus; + public class MonitorPublisher { private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class); private EventBus eventBus; @@ -38,10 +41,18 @@ public class MonitorPublisher { eventBus.register(listener); } - public void publish(JobStatus jobState) { - eventBus.post(jobState); + public void publish(JobStatus jobStatus) { + eventBus.post(jobStatus); } + public void publish(TaskStatus taskStatus) { + eventBus.post(taskStatus); + } + + public void publish(ExperimentStatus experimentStatus) { + eventBus.post(experimentStatus); + } + public void publish(MonitorID monitorID){ eventBus.post(monitorID); }
