publish messages to rabbitmq
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e257af72 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e257af72 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e257af72 Branch: refs/heads/master Commit: e257af7219415ba31b840b5b9c80620463b77c0c Parents: 8a8a02b Author: Chathuri Wimalasena <[email protected]> Authored: Tue Sep 16 14:21:41 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Tue Sep 16 14:21:41 2014 -0400 ---------------------------------------------------------------------- .../server/listener/AiravataExperimentStatusUpdator.java | 9 +++++++-- .../org/apache/airavata/common/utils/ServerSettings.java | 5 +++++ modules/distribution/server/pom.xml | 5 +++++ .../server/src/main/assembly/bin-assembly.xml | 1 + modules/gfac/gfac-core/pom.xml | 5 +++++ .../apache/airavata/gfac/core/cpi/BetterGfacImpl.java | 6 +++++- .../gfac/core/monitor/AiravataJobStatusUpdator.java | 7 ++++++- .../gfac/core/monitor/AiravataTaskStatusUpdator.java | 11 +++++++---- .../core/monitor/AiravataWorkflowNodeStatusUpdator.java | 11 ++++++++--- 9 files changed, 49 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java index c78390a..aff0e07 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java @@ -25,6 +25,7 @@ import java.util.Calendar; import org.apache.airavata.api.server.util.DataModelUtils; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; import org.apache.airavata.model.util.ExecutionType; @@ -42,6 +43,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener private Registry airavataRegistry; private MonitorPublisher monitorPublisher; + private Publisher publisher; public Registry getAiravataRegistry() { return airavataRegistry; @@ -85,6 +87,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state); logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString()); monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId())); + publisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId())); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } @@ -111,7 +114,9 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener this.airavataRegistry=(Registry)configuration; } else if (configuration instanceof MonitorPublisher){ this.monitorPublisher=(MonitorPublisher) configuration; - } - } + } else if (configuration instanceof Publisher){ + this.publisher=(Publisher) configuration; + } + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 93f3bc3..6594ecc 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -66,6 +66,7 @@ public class ServerSettings extends ApplicationSettings{ private static final String MY_PROXY_USER = "myproxy.user"; private static final String MY_PROXY_PASSWORD = "myproxy.password"; private static final String MY_PROXY_LIFETIME = "myproxy.life"; + private static final String ACTIVITY_PUBLISHER = "activity.publisher"; private static final String ACTIVITY_LISTENERS = "activity.listeners"; private static boolean stopAllThreads = false; @@ -219,6 +220,10 @@ public class ServerSettings extends ApplicationSettings{ public static String[] getActivityListeners() throws ApplicationSettingsException { return getSetting(ACTIVITY_LISTENERS).split(","); } + + public static String getActivityPublisher() throws ApplicationSettingsException{ + return getSetting(ACTIVITY_PUBLISHER); + } public static boolean isEmbeddedZK() { return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true")); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/distribution/server/pom.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml index 8138baa..4c1570f 100644 --- a/modules/distribution/server/pom.xml +++ b/modules/distribution/server/pom.xml @@ -254,6 +254,11 @@ </dependency> <dependency> <groupId>org.apache.airavata</groupId> + <artifactId>airavata-messaging-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> <artifactId>app-catalog-data</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/distribution/server/src/main/assembly/bin-assembly.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml index e84d7b9..90fd9a3 100644 --- a/modules/distribution/server/src/main/assembly/bin-assembly.xml +++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml @@ -220,6 +220,7 @@ <include>org.apache.airavata:airavata-message-monitor:jar</include> <include>org.apache.airavata:airavata-workflow-model-core:jar</include> <include>org.apache.airavata:airavata-messenger-commons:jar</include> + <include>org.apache.airavata:airavata-messaging-core:jar</include> <include>org.apache.airavata:airavata-messenger-client:jar</include> <include>org.apache.airavata:airavata-workflow-tracking:jar</include> <include>org.apache.airavata:airavata-workflow-engine:jar</include> http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml index f0a73f6..bb7836e 100644 --- a/modules/gfac/gfac-core/pom.xml +++ b/modules/gfac/gfac-core/pom.xml @@ -77,6 +77,11 @@ <artifactId>airavata-credential-store</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-messaging-core</artifactId> + <version>${project.version}</version> + </dependency> <!-- Test --> http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index e843e4d..75f5694 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -50,6 +50,7 @@ import org.apache.airavata.gfac.core.provider.GFacRecoverableProvider; import org.apache.airavata.gfac.core.states.GfacExperimentState; import org.apache.airavata.gfac.core.states.GfacPluginState; import org.apache.airavata.gfac.core.utils.GFacUtils; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; @@ -126,11 +127,14 @@ public class BetterGfacImpl implements GFac { public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) { try { String[] listenerClassList = ServerSettings.getActivityListeners(); + String activityPublisher = ServerSettings.getActivityPublisher(); + Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class); + Publisher rabbitMQPublisher = aPublisher.newInstance(); for (String listenerClass : listenerClassList) { Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); AbstractActivityListener abstractActivityListener = aClass.newInstance(); activityListeners.add(abstractActivityListener); - abstractActivityListener.setup(publisher, registry, zk); + abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher); log.info("Registering listener: " + listenerClass); publisher.registerListener(abstractActivityListener); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java index 473debd..d142ceb 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java @@ -24,6 +24,7 @@ import java.util.Calendar; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; @@ -41,6 +42,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { private Registry airavataRegistry; private MonitorPublisher monitorPublisher; + private Publisher publisher; public Registry getAiravataRegistry() { @@ -65,6 +67,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { updateJobStatus(taskID, jobID, state); logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString()); monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity())); + publisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity())); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } @@ -93,7 +96,9 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { this.airavataRegistry=(Registry)configuration; } else if (configuration instanceof MonitorPublisher){ this.monitorPublisher=(MonitorPublisher) configuration; - } + } else if (configuration instanceof Publisher){ + this.publisher=(Publisher) configuration; + } } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index e6ab5ef..f4e6241 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@ -24,6 +24,7 @@ import java.util.Calendar; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.messaging.event.TaskIdentity; import org.apache.airavata.model.workspace.experiment.TaskDetails; @@ -37,10 +38,9 @@ import com.google.common.eventbus.Subscribe; public class AiravataTaskStatusUpdator implements AbstractActivityListener { private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class); - private Registry airavataRegistry; - private MonitorPublisher monitorPublisher; + private Publisher publisher; public Registry getAiravataRegistry() { return airavataRegistry; @@ -93,6 +93,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { jobStatus.getJobIdentity().getWorkflowNodeId(), jobStatus.getJobIdentity().getExperimentId()); monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity)); + publisher.publish(new TaskStatusChangeEvent(state, taskIdentity)); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); @@ -119,7 +120,9 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { this.airavataRegistry=(Registry)configuration; } else if (configuration instanceof MonitorPublisher){ this.monitorPublisher=(MonitorPublisher) configuration; - } - } + } else if (configuration instanceof Publisher){ + this.publisher=(Publisher) configuration; + } + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java index e0b8f9e..fe24bd0 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java @@ -24,6 +24,7 @@ import java.util.Calendar; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.messaging.event.WorkflowIdentity; import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; @@ -41,8 +42,9 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class); private Registry airavataRegistry; - private MonitorPublisher monitorPublisher; + private Publisher publisher; + public Registry getAiravataRegistry() { return airavataRegistry; @@ -78,6 +80,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString()); WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId()); monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity)); + publisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity)); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } @@ -103,7 +106,9 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen this.airavataRegistry=(Registry)configuration; } else if (configuration instanceof MonitorPublisher){ this.monitorPublisher=(MonitorPublisher) configuration; - } - } + } else if (configuration instanceof Publisher){ + this.publisher=(Publisher) configuration; + } + } } }
