Repository: airavata
Updated Branches:
  refs/heads/master 9ae7c6ec3 -> ce4b32490


publushing created state and launched state


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ce4b3249
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ce4b3249
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ce4b3249

Branch: refs/heads/master
Commit: ce4b324907381fb1d79733c5605b30c5f3311993
Parents: 9ae7c6e
Author: Chathuri Wimalasena <[email protected]>
Authored: Thu Oct 23 11:36:57 2014 -0400
Committer: Chathuri Wimalasena <[email protected]>
Committed: Thu Oct 23 11:36:57 2014 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   | 42 +++++++++++++++++---
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  8 ++--
 .../messaging/core/PublisherFactory.java        |  2 +-
 .../airavata/messaging/core/TestClient.java     | 22 ++++++++--
 4 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 9a496a0..9c0810d 100644
--- 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -28,10 +28,15 @@ import 
org.apache.aiaravata.application.catalog.data.util.AppCatalogThriftConver
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavataAPIConstants;
 import org.apache.airavata.api.server.util.DataModelUtils;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationModule;
 import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
@@ -41,6 +46,8 @@ import org.apache.airavata.model.appcatalog.computeresource.*;
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.error.*;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.util.ExecutionType;
 import org.apache.airavata.model.workspace.Project;
 import org.apache.airavata.model.workspace.experiment.*;
@@ -64,13 +71,18 @@ public class AiravataServerHandler implements 
Airavata.Iface {
     private static final AiravataLogger logger = 
AiravataLoggerFactory.getLogger(AiravataServerHandler.class);
     private Registry registry;
     private AppCatalog appCatalog;
+    private Publisher publisher;
 
     public AiravataServerHandler() {
-//        try {
-//            storeServerConfig();
-//        } catch (ApplicationSettingsException e) {
-//            e.printStackTrace();
-//        }
+        try {
+            if (ServerSettings.isRabbitMqPublishEnabled()) {
+                publisher = PublisherFactory.createPublisher();
+            }
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error occured while reading airavata-server 
properties..", e);
+        } catch (AiravataException e) {
+            logger.error("Error occured while reading airavata-server 
properties..", e);
+        }
     }
 
 //    private void storeServerConfig() throws ApplicationSettingsException {
@@ -670,6 +682,16 @@ public class AiravataServerHandler implements 
Airavata.Iface {
                 throw exception;
             }
             String experimentId = 
(String)registry.add(ParentDataType.EXPERIMENT, experiment);
+            if (ServerSettings.isRabbitMqPublishEnabled()){
+                String gatewayId = ServerSettings.getDefaultUserGateway();
+                ExperimentStatusChangeEvent event = new 
ExperimentStatusChangeEvent(ExperimentState.CREATED,
+                        experimentId,
+                        gatewayId);
+                String messageId = AiravataUtils.getId("EXPERIMENT");
+                MessageContext messageContext = new MessageContext(event, 
MessageType.EXPERIMENT,messageId,gatewayId);
+                
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                publisher.publish(messageContext);
+            }
             logger.infoId(experimentId, "Created new experiment with 
experiment name {}", experiment.getName());
             return experimentId;
         } catch (Exception e) {
@@ -1198,6 +1220,16 @@ public class AiravataServerHandler implements 
Airavata.Iface {
                     
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
                     experiment.setExperimentStatus(status);
                     registry.update(RegistryModelType.EXPERIMENT_STATUS, 
status, experimentId);
+                    if (ServerSettings.isRabbitMqPublishEnabled()){
+                        String gatewayId = 
ServerSettings.getDefaultUserGateway();
+                        ExperimentStatusChangeEvent event = new 
ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
+                                experimentId,
+                                gatewayId);
+                        String messageId = AiravataUtils.getId("EXPERIMENT");
+                        MessageContext messageContext = new 
MessageContext(event, MessageType.EXPERIMENT,messageId,gatewayId);
+                        
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                        publisher.publish(messageContext);
+                    }
                     registry.update(RegistryModelType.TASK_DETAIL, taskData, 
taskData.getTaskID());
                     //launching the experiment
                     orchestratorClient.launchTask(taskData.getTaskID(), 
airavataCredStoreToken);

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 803614b..ca7620d 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -29,6 +29,7 @@ import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.xpath.XPathExpressionException;
 import org.airavata.appcatalog.cpi.AppCatalog;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
@@ -59,6 +60,7 @@ import org.apache.airavata.gfac.core.utils.GFacUtils;
 
 import org.apache.airavata.messaging.core.Publisher;
 
+import org.apache.airavata.messaging.core.PublisherFactory;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
@@ -157,11 +159,9 @@ public class BetterGfacImpl implements GFac,Watcher {
     public static void startStatusUpdators(Registry registry, ZooKeeper zk, 
MonitorPublisher publisher) {
         try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
-            String activityPublisher = ServerSettings.getActivityPublisher();
             Publisher rabbitMQPublisher = null;
             if (ServerSettings.isRabbitMqPublishEnabled()){
-                Class<? extends Publisher> aPublisher = 
Class.forName(activityPublisher).asSubclass(Publisher.class);
-                rabbitMQPublisher = aPublisher.newInstance();
+                rabbitMQPublisher = PublisherFactory.createPublisher();
             }
             for (String listenerClass : listenerClassList) {
                 Class<? extends AbstractActivityListener> aClass = 
Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
@@ -179,6 +179,8 @@ public class BetterGfacImpl implements GFac,Watcher {
             log.error("Error loading the listener classes configured in 
airavata-server.properties", e);
         } catch (ApplicationSettingsException e) {
             log.error("Error loading the listener classes configured in 
airavata-server.properties", e);
+        } catch (AiravataException e) {
+            log.error("Error loading the listener classes configured in 
airavata-server.properties", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
index 116e9b4..2080cc6 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 public class PublisherFactory {
     private static Logger log = 
LoggerFactory.getLogger(PublisherFactory.class);
 
-    public Publisher createPublisher() throws AiravataException {
+    public static Publisher createPublisher() throws AiravataException {
         String activityPublisher = ServerSettings.getActivityPublisher();
 
         if (activityPublisher == null) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 678f1b7..0f31f49 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -24,8 +24,13 @@ package org.apache.airavata.messaging.core;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +44,7 @@ public class TestClient {
     public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
     public static final String RABBITMQ_EXCHANGE_NAME = 
"rabbitmq.exchange.name";
     private final static Logger logger = 
LoggerFactory.getLogger(TestClient.class);
-    private final static String experimentId = 
"echoExperiment_febc8b78-a66a-4c05-9b1f-1a6ebb0089d8";
+    private final static String experimentId = "*";
 
     public static void main(String[] args) {
         try {
@@ -60,9 +65,18 @@ public class TestClient {
 
                 @Override
                 public void onMessage(MessageContext message) {
-                    System.out.println(" Message Received with message id '" + 
message.getMessageId()
-                            + "' and with message type '" + message.getType());
-                    System.out.println("message received: " + message);
+                    if (message.getType().equals(MessageType.EXPERIMENT)){
+                        try {
+                            ExperimentStatusChangeEvent event = new 
ExperimentStatusChangeEvent();
+                            TBase messageEvent = message.getEvent();
+                            byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
+                            ThriftUtils.createThriftFromBytes(bytes, event);
+                            System.out.println(" Message Received with message 
id '" + message.getMessageId()
+                                    + "' and with message type '" + 
message.getType() + "' and with state : '" + event.getState().toString());
+                        } catch (TException e) {
+                            e.printStackTrace();
+                        }
+                    }
                 }
             });
         } catch (ApplicationSettingsException e) {

Reply via email to