Repository: airavata Updated Branches: refs/heads/master 9ae7c6ec3 -> ce4b32490
publushing created state and launched state Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ce4b3249 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ce4b3249 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ce4b3249 Branch: refs/heads/master Commit: ce4b324907381fb1d79733c5605b30c5f3311993 Parents: 9ae7c6e Author: Chathuri Wimalasena <[email protected]> Authored: Thu Oct 23 11:36:57 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Thu Oct 23 11:36:57 2014 -0400 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 42 +++++++++++++++++--- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 8 ++-- .../messaging/core/PublisherFactory.java | 2 +- .../airavata/messaging/core/TestClient.java | 22 ++++++++-- 4 files changed, 61 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index 9a496a0..9c0810d 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -28,10 +28,15 @@ import org.apache.aiaravata.application.catalog.data.util.AppCatalogThriftConver import org.apache.airavata.api.Airavata; import org.apache.airavata.api.airavataAPIConstants; import org.apache.airavata.api.server.util.DataModelUtils; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.PublisherFactory; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationModule; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; @@ -41,6 +46,8 @@ import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; import org.apache.airavata.model.error.*; +import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.util.ExecutionType; import org.apache.airavata.model.workspace.Project; import org.apache.airavata.model.workspace.experiment.*; @@ -64,13 +71,18 @@ public class AiravataServerHandler implements Airavata.Iface { private static final AiravataLogger logger = AiravataLoggerFactory.getLogger(AiravataServerHandler.class); private Registry registry; private AppCatalog appCatalog; + private Publisher publisher; public AiravataServerHandler() { -// try { -// storeServerConfig(); -// } catch (ApplicationSettingsException e) { -// e.printStackTrace(); -// } + try { + if (ServerSettings.isRabbitMqPublishEnabled()) { + publisher = PublisherFactory.createPublisher(); + } + } catch (ApplicationSettingsException e) { + logger.error("Error occured while reading airavata-server properties..", e); + } catch (AiravataException e) { + logger.error("Error occured while reading airavata-server properties..", e); + } } // private void storeServerConfig() throws ApplicationSettingsException { @@ -670,6 +682,16 @@ public class AiravataServerHandler implements Airavata.Iface { throw exception; } String experimentId = (String)registry.add(ParentDataType.EXPERIMENT, experiment); + if (ServerSettings.isRabbitMqPublishEnabled()){ + String gatewayId = ServerSettings.getDefaultUserGateway(); + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.CREATED, + experimentId, + gatewayId); + String messageId = AiravataUtils.getId("EXPERIMENT"); + MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT,messageId,gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); + } logger.infoId(experimentId, "Created new experiment with experiment name {}", experiment.getName()); return experimentId; } catch (Exception e) { @@ -1198,6 +1220,16 @@ public class AiravataServerHandler implements Airavata.Iface { status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); experiment.setExperimentStatus(status); registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); + if (ServerSettings.isRabbitMqPublishEnabled()){ + String gatewayId = ServerSettings.getDefaultUserGateway(); + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED, + experimentId, + gatewayId); + String messageId = AiravataUtils.getId("EXPERIMENT"); + MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT,messageId,gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); + } registry.update(RegistryModelType.TASK_DETAIL, taskData, taskData.getTaskID()); //launching the experiment orchestratorClient.launchTask(taskData.getTaskID(), airavataCredStoreToken); http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 803614b..ca7620d 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -29,6 +29,7 @@ import javax.xml.parsers.ParserConfigurationException; import javax.xml.xpath.XPathExpressionException; import org.airavata.appcatalog.cpi.AppCatalog; import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.MonitorPublisher; @@ -59,6 +60,7 @@ import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.PublisherFactory; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; @@ -157,11 +159,9 @@ public class BetterGfacImpl implements GFac,Watcher { public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) { try { String[] listenerClassList = ServerSettings.getActivityListeners(); - String activityPublisher = ServerSettings.getActivityPublisher(); Publisher rabbitMQPublisher = null; if (ServerSettings.isRabbitMqPublishEnabled()){ - Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class); - rabbitMQPublisher = aPublisher.newInstance(); + rabbitMQPublisher = PublisherFactory.createPublisher(); } for (String listenerClass : listenerClassList) { Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); @@ -179,6 +179,8 @@ public class BetterGfacImpl implements GFac,Watcher { log.error("Error loading the listener classes configured in airavata-server.properties", e); } catch (ApplicationSettingsException e) { log.error("Error loading the listener classes configured in airavata-server.properties", e); + } catch (AiravataException e) { + log.error("Error loading the listener classes configured in airavata-server.properties", e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java index 116e9b4..2080cc6 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public class PublisherFactory { private static Logger log = LoggerFactory.getLogger(PublisherFactory.class); - public Publisher createPublisher() throws AiravataException { + public static Publisher createPublisher() throws AiravataException { String activityPublisher = ServerSettings.getActivityPublisher(); if (activityPublisher == null) { http://git-wip-us.apache.org/repos/asf/airavata/blob/ce4b3249/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java index 678f1b7..0f31f49 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java @@ -24,8 +24,13 @@ package org.apache.airavata.messaging.core; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.impl.RabbitMQConsumer; +import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.Message; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +44,7 @@ public class TestClient { public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; private final static Logger logger = LoggerFactory.getLogger(TestClient.class); - private final static String experimentId = "echoExperiment_febc8b78-a66a-4c05-9b1f-1a6ebb0089d8"; + private final static String experimentId = "*"; public static void main(String[] args) { try { @@ -60,9 +65,18 @@ public class TestClient { @Override public void onMessage(MessageContext message) { - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType()); - System.out.println("message received: " + message); + if (message.getType().equals(MessageType.EXPERIMENT)){ + try { + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString()); + } catch (TException e) { + e.printStackTrace(); + } + } } }); } catch (ApplicationSettingsException e) {
