Repository: airavata
Updated Branches:
  refs/heads/master 56574dd19 -> d4e398545


https://issues.apache.org/jira/browse/AIRAVATA-1145


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/20306cd0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/20306cd0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/20306cd0

Branch: refs/heads/master
Commit: 20306cd044679cf5d0f61e4832eae02a2ae69e83
Parents: 56574dd
Author: Saminda Wijeratne <[email protected]>
Authored: Fri Apr 18 04:14:02 2014 -0700
Committer: Saminda Wijeratne <[email protected]>
Committed: Fri Apr 18 04:14:02 2014 -0700

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |  2 +-
 .../job/monitor/AbstractActivityListener.java   | 27 +++++++++++++
 .../monitor/AbstractActivityMonitorClient.java  | 27 -------------
 .../job/monitor/AiravataJobStatusUpdator.java   | 42 ++++++++++++++++++--
 .../airavata/job/monitor/MonitorManager.java    | 42 ++++++++++----------
 .../SingleAppIntegrationTestBase.java           |  1 +
 .../orchestrator/server/OrchestratorServer.java |  3 --
 .../apache/airavata/job/monitor/MonitorID.java  | 11 ++---
 .../job/monitor/event/MonitorPublisher.java     | 17 ++++++--
 9 files changed, 107 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git 
a/modules/configuration/server/src/main/resources/airavata-server.properties 
b/modules/configuration/server/src/main/resources/airavata-server.properties
index 193e9c8..13f78d5 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -262,7 +262,7 @@ 
monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.monitors=org.apache.airavata.job.monitor.AiravataJobStatusUpdator
+activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
 
 ###---------------------------Orchestrator module 
Configurations---------------------------###
 
job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
new file mode 100644
index 0000000..49927e6
--- /dev/null
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+
+public interface AbstractActivityListener {
+       public void setup(Object...configurations);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
deleted file mode 100644
index 9124cc3..0000000
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.job.monitor;
-
-
-public interface AbstractActivityMonitorClient {
-       public void setup(Object...configurations);
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 37045a8..ec03d71 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -23,9 +23,12 @@ package org.apache.airavata.job.monitor;
 import java.util.Calendar;
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.TaskStatus;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.registry.cpi.CompositeIdentifier;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
@@ -34,11 +37,13 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.Subscribe;
 
-public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{
+public class AiravataJobStatusUpdator implements AbstractActivityListener{
     private final static Logger logger = 
LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
 
     private Registry airavataRegistry;
 
+    private MonitorPublisher monitorPublisher;
+    
     private BlockingQueue<MonitorID> jobsToMonitor;
 
     public Registry getAiravataRegistry() {
@@ -105,9 +110,39 @@ public class AiravataJobStatusUpdator implements 
AbstractActivityMonitorClient{
                     logger.info("Job ID:" + 
jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
                     jobsToMonitor.remove(jobStatus.getMonitorID());
                     break;
+                       default:
+                               break;
             }
         }
     }
+    
+    @Subscribe
+    public void setupTaskStatus(JobStatus jobStatus){
+       TaskState state=TaskState.UNKNOWN;
+       switch(jobStatus.getState()){
+       case ACTIVE:
+               state=TaskState.EXECUTING; break;
+       case CANCELED:
+               state=TaskState.CANCELED; break;
+       case COMPLETE:
+               state=TaskState.COMPLETED; break;
+       case FAILED:
+               state=TaskState.FAILED; break;
+       case HELD: case SUSPENDED: case QUEUED:
+               state=TaskState.WAITING; break;
+       case SETUP:
+               state=TaskState.PRE_PROCESSING; break;
+       case SUBMITTED:
+               state=TaskState.STARTED; break;
+       case UN_SUBMITTED:
+               state=TaskState.CANCELED; break;
+               default:
+                       break;
+       }
+       logger.debug("Publishing Task Status "+state.toString());
+       monitorPublisher.publish(new 
TaskStatus(jobStatus.getMonitorID(),state));
+    }
+    
     public  void updateJobStatus(String taskId, String jobID, JobState state) 
throws Exception {
         CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
         JobDetails details = 
(JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
@@ -126,12 +161,13 @@ public class AiravataJobStatusUpdator implements 
AbstractActivityMonitorClient{
        @Override
        public void setup(Object... configurations) {
                for (Object configuration : configurations) {
-                       
                        if (configuration instanceof Registry){
                                this.airavataRegistry=(Registry)configuration;
                        } else if (configuration instanceof BlockingQueue<?>){
                                this.jobsToMonitor=(BlockingQueue<MonitorID>) 
configuration;
-                       }
+                       } else if (configuration instanceof MonitorPublisher){
+                               this.monitorPublisher=(MonitorPublisher) 
configuration;
+                       } 
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 1929057..ed89230 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -55,6 +55,8 @@ remove them from the queue, this will be done by 
AiravataJobUpdator.
  */
 public class MonitorManager {
     private final static Logger logger = 
LoggerFactory.getLogger(MonitorManager.class);
+    
+       private final static String ACTIVITY_LISTENERS = "activity.listeners";
 
     private List<PullMonitor> pullMonitors;    //todo though we have a List we 
only support one at a time
 
@@ -72,7 +74,7 @@ public class MonitorManager {
 
     private Monitor localJobMonitor;
     
-    private List<AbstractActivityMonitorClient> activityMonitors;
+    private List<AbstractActivityListener> activityListeners;
 
     private Registry registry;
 
@@ -97,27 +99,27 @@ public class MonitorManager {
     
     private void loadActivityMonitors(){
                try {
-                       activityMonitors=new 
ArrayList<AbstractActivityMonitorClient>();
-                       String activityMonitorsString = 
ServerSettings.getSetting("activity.monitors");
-                       if (activityMonitorsString!=null){
-                               String[] activityMonitorClasses = 
activityMonitorsString.split(",");
-                               for (String activityMonitorClassName : 
activityMonitorClasses) {
-                                       Class<?> classInstance;
+                       activityListeners=new 
ArrayList<AbstractActivityListener>();
+                       String activityListenersString = 
ServerSettings.getSetting(ACTIVITY_LISTENERS);
+                       if (activityListenersString!=null){
+                               String[] activityListenerClasses = 
activityListenersString.split(",");
+                               for (String activityListenerClassName : 
activityListenerClasses) {
                                        try {
-                                               classInstance = 
MonitorManager.class
-                                                       
.getClassLoader().loadClass(activityMonitorClassName);
-                                               AbstractActivityMonitorClient 
monitor=(AbstractActivityMonitorClient)classInstance.newInstance();
-                                               
monitor.setup(registry,getFinishQueue());
-                                               activityMonitors.add(monitor);
+                                               
activityListenerClassName=activityListenerClassName.trim();
+                                               Class<?>  classInstance = 
MonitorManager.class
+                                                       
.getClassLoader().loadClass(activityListenerClassName);
+                                               AbstractActivityListener 
monitor=(AbstractActivityListener)classInstance.newInstance();
+                                               monitor.setup(registry, 
getFinishQueue(), getMonitorPublisher());
+                                               activityListeners.add(monitor);
                                                registerListener(monitor);
                                        } catch (ClassNotFoundException e) {
-                                               logger.error("Error while 
locating activity monitor implementation 
\""+activityMonitorClassName+"\"!!!",e);
+                                               logger.error("Error while 
locating activity monitor implementation 
\""+activityListenerClassName+"\"!!!",e);
                                        } catch (InstantiationException e) {
-                                               logger.error("Error while 
initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e);
+                                               logger.error("Error while 
initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
                                        } catch (IllegalAccessException e) {
-                                               logger.error("Error while 
initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e);
+                                               logger.error("Error while 
initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
                                        } catch (ClassCastException e){
-                                               logger.error("Invalid activity 
monitor \""+activityMonitorClassName+"\"!!!",e);
+                                               logger.error("Invalid activity 
monitor \""+activityListenerClassName+"\"!!!",e);
                                        }
                                }
                        }
@@ -234,7 +236,6 @@ public class MonitorManager {
      */
     public void launchMonitor() throws AiravataMonitorException {
         //no push monitor is configured so we launch pull monitor
-        int index = 0;
         if(localJobMonitor != null){
             (new Thread(localJobMonitor)).start();
         }
@@ -261,18 +262,17 @@ public class MonitorManager {
      */
     public void stopMonitor() throws AiravataMonitorException {
         //no push monitor is configured so we launch pull monitor
-        int index = 0;
         if(localJobMonitor != null){
-            (new Thread(localJobMonitor)).stop();
+            (new Thread(localJobMonitor)).interrupt();
         }
 
         for (PullMonitor monitor : pullMonitors) {
-            (new Thread(monitor)).stop();
+            (new Thread(monitor)).interrupt();
         }
 
         //todo fix this
         for (PushMonitor monitor : pushMonitors) {
-            (new Thread(monitor)).stop();
+            (new Thread(monitor)).interrupt();
         }
     }
     /* getter setters for the private variables */

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
 
b/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
index 11e6a77..6592bf8 100644
--- 
a/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
+++ 
b/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
@@ -136,6 +136,7 @@ public class SingleAppIntegrationTestBase {
                 Map<String, JobStatus> jobStatuses = null;
                 while (true) {
                     try {
+                       System.out.println("*********Experiment status*** : 
"+client.getExperimentStatus(expId));
                         jobStatuses = client.getJobStatuses(expId);
                         Set<String> strings = jobStatuses.keySet();
                         for (String key : strings) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
index 115ea3f..d6ff5c3 100644
--- 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
+++ 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
@@ -23,12 +23,9 @@ package org.apache.airavata.orchestrator.server;
 
 import org.apache.airavata.common.utils.IServer;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.IServer.ServerStatus;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.util.Constants;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.server.TSimpleServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index f65241a..718177c 100644
--- 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -20,20 +20,17 @@
 */
 package org.apache.airavata.job.monitor;
 
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Map;
+
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import 
org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.job.monitor.state.JobStatus;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.omg.PortableInterceptor.ACTIVE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Map;
-import java.util.Properties;
-
 /*
 This is the object which contains the data to identify a particular
 Job to start the monitoring

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
index 0f75206..cc85e58 100644
--- 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
@@ -20,12 +20,15 @@
 */
 package org.apache.airavata.job.monitor.event;
 
-import com.google.common.eventbus.EventBus;
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.state.ExperimentStatus;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.TaskStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.EventBus;
+
 public class MonitorPublisher {
     private final static Logger logger = 
LoggerFactory.getLogger(MonitorPublisher.class);
     private EventBus eventBus;
@@ -38,10 +41,18 @@ public class MonitorPublisher {
         eventBus.register(listener);
     }
 
-    public void publish(JobStatus jobState) {
-        eventBus.post(jobState);
+    public void publish(JobStatus jobStatus) {
+        eventBus.post(jobStatus);
     }
 
+    public void publish(TaskStatus taskStatus) {
+        eventBus.post(taskStatus);
+    }
+    
+    public void publish(ExperimentStatus experimentStatus) {
+        eventBus.post(experimentStatus);
+    }
+    
     public void publish(MonitorID monitorID){
         eventBus.post(monitorID);
     }

Reply via email to