Repository: airavata
Updated Branches:
  refs/heads/master e68039748 -> 042209406


adding rabbitMQ java client - AIRAVATA-1574


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/04220940
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/04220940
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/04220940

Branch: refs/heads/master
Commit: 042209406ecacb00d88c610dd8717aa12e7aa33c
Parents: e680397
Author: Chathuri Wimalasena <[email protected]>
Authored: Thu Feb 12 14:50:00 2015 -0500
Committer: Chathuri Wimalasena <[email protected]>
Committed: Thu Feb 12 14:50:00 2015 -0500

----------------------------------------------------------------------
 modules/messaging/client/README                 |  15 ++
 modules/messaging/client/pom.xml                | 103 +++++++++
 .../messaging/client/RabbitMQListner.java       | 215 +++++++++++++++++++
 .../messaging/core/impl/RabbitMQPublisher.java  |   9 +-
 modules/messaging/pom.xml                       |   1 +
 5 files changed, 339 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/client/README
----------------------------------------------------------------------
diff --git a/modules/messaging/client/README b/modules/messaging/client/README
new file mode 100644
index 0000000..86a9f1a
--- /dev/null
+++ b/modules/messaging/client/README
@@ -0,0 +1,15 @@
+How to run
+========================
+
+1. Copy airavata-messaging-client-0.15-SNAPSHOT-jar-with-dependencies.jar to a 
location
+2. Create a folder called "resources" in the same location and copy 
airavata-server.properties with updated broker url in to the resources folder.
+3. To run the jar, in the command line type :
+        java -Xmx1024m -jar 
airavata-messaging-client-0.15-SNAPSHOT-jar-with-dependencies.jar -a
+
+        You can provide different parameters.
+            -a = to listen to all the messages
+            -gId = to listen to all the messages for given gateway Id
+            -eId = to listen to all the messages for given experiment Id
+            -jId = to listen to all the messages for given job Id
+
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/client/pom.xml
----------------------------------------------------------------------
diff --git a/modules/messaging/client/pom.xml b/modules/messaging/client/pom.xml
new file mode 100644
index 0000000..762bd10
--- /dev/null
+++ b/modules/messaging/client/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE file 
+    distributed with this work for additional information regarding copyright 
ownership. The ASF licenses this file to you under 
+    the Apache License, Version 2.0 (theÏ "License"); you may not use this 
file except in compliance with the License. You may 
+    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
Unless required by applicable law or agreed to 
+    in writing, software distributed under the License is distributed on an 
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
+    ANY ~ KIND, either express or implied. See the License for the specific 
language governing permissions and limitations under 
+    the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>messaging</artifactId>
+        <version>0.15-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>airavata-messaging-client</artifactId>
+    <packaging>jar</packaging>
+    <name>Airavata Messaging Client</name>
+    <url>http://airavata.apache.org/</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-data-models</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-server-configuration</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>3.3.5</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>2.3.2</version>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            
<mainClass>org.apache.airavata.messaging.client.RabbitMQListner</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging 
phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
 
b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
new file mode 100644
index 0000000..215d531
--- /dev/null
+++ 
b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.client;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.commons.cli.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class RabbitMQListner {
+    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+    public static final String RABBITMQ_EXCHANGE_NAME = 
"rabbitmq.exchange.name";
+    private final static Logger logger = 
LoggerFactory.getLogger(RabbitMQListner.class);
+    private static String gatewayId = "*";
+    private static boolean gatewayLevelMessages = false;
+    private static boolean experimentLevelMessages = false;
+    private static boolean jobLevelMessages = false;
+    private static String experimentId = "*";
+    private static String jobId = "*";
+    private static boolean allMessages = false;
+
+    public static void main(String[] args) {
+        parseArguments(args);
+        try {
+            AiravataUtils.setExecutionAsServer();
+            String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
+            final String exchangeName = 
ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
+            RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, 
exchangeName);
+            consumer.listen(new MessageHandler() {
+                @Override
+                public Map<String, Object> getProperties() {
+                    Map<String, Object> props = new HashMap<String, Object>();
+                    List<String> routingKeys = new ArrayList<String>();
+                    if (allMessages){
+                        routingKeys.add("*");
+                        routingKeys.add("*.*");
+                        routingKeys.add("*.*.*");
+                        routingKeys.add("*.*.*.*");
+                        routingKeys.add("*.*.*.*.*");
+                    }else {
+                        if (gatewayLevelMessages){
+                            routingKeys.add(gatewayId);
+                            routingKeys.add(gatewayId + ".*");
+                            routingKeys.add(gatewayId + ".*.*");
+                            routingKeys.add(gatewayId + ".*.*.*");
+                            routingKeys.add(gatewayId + ".*.*.*.*");
+                        }else if (experimentLevelMessages){
+                            routingKeys.add(gatewayId);
+                            routingKeys.add(gatewayId + "." + experimentId);
+                            routingKeys.add(gatewayId + "." + experimentId+ 
".*");
+                            routingKeys.add(gatewayId + "." + experimentId+ 
".*.*");
+                            routingKeys.add(gatewayId + "." + experimentId+ 
".*.*.*");
+                        }else if  (jobLevelMessages){
+                            routingKeys.add(gatewayId);
+                            routingKeys.add(gatewayId + "." + experimentId);
+                            routingKeys.add(gatewayId + "." + experimentId+ 
".*");
+                            routingKeys.add(gatewayId + "." + experimentId+ 
".*.*");
+                            routingKeys.add(gatewayId + "." + experimentId+ 
".*." + jobId);
+                        }
+                    }
+                    props.put(MessagingConstants.RABBIT_ROUTING_KEY, 
routingKeys);
+                    return props;
+                }
+
+                @Override
+                public void onMessage(MessageContext message) {
+                    if (message.getType().equals(MessageType.EXPERIMENT)){
+                        try {
+                            ExperimentStatusChangeEvent event = new 
ExperimentStatusChangeEvent();
+                            TBase messageEvent = message.getEvent();
+                            byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
+                            ThriftUtils.createThriftFromBytes(bytes, event);
+                            System.out.println(" Message Received with message 
id '" + message.getMessageId()
+                                    + "' and with message type '" + 
message.getType() + "' and with state : '" + event.getState().toString() +
+                                       " for Gateway " + event.getGatewayId());
+                        } catch (TException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }else if 
(message.getType().equals(MessageType.WORKFLOWNODE)){
+                        try {
+                            WorkflowNodeStatusChangeEvent event = new 
WorkflowNodeStatusChangeEvent();
+                            TBase messageEvent = message.getEvent();
+                            byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
+                            ThriftUtils.createThriftFromBytes(bytes, event);
+                            System.out.println(" Message Received with message 
id '" + message.getMessageId()
+                                    + "' and with message type '" + 
message.getType() + "' and with state : '" + event.getState().toString() +
+                                    " for Gateway " + 
event.getWorkflowNodeIdentity().getGatewayId());
+                        } catch (TException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }else if (message.getType().equals(MessageType.TASK)){
+                        try {
+                            TaskStatusChangeEvent event = new 
TaskStatusChangeEvent();
+                            TBase messageEvent = message.getEvent();
+                            byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
+                            ThriftUtils.createThriftFromBytes(bytes, event);
+                            System.out.println(" Message Received with message 
id '" + message.getMessageId()
+                                    + "' and with message type '" + 
message.getType() + "' and with state : '" + event.getState().toString() +
+                                    " for Gateway " + 
event.getTaskIdentity().getGatewayId());
+                        } catch (TException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }else if (message.getType().equals(MessageType.JOB)){
+                        try {
+                            JobStatusChangeEvent event = new 
JobStatusChangeEvent();
+                            TBase messageEvent = message.getEvent();
+                            byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
+                            ThriftUtils.createThriftFromBytes(bytes, event);
+                            System.out.println(" Message Received with message 
id '" + message.getMessageId()
+                                    + "' and with message type '" + 
message.getType() + "' and with state : '" + event.getState().toString() +
+                                    " for Gateway " + 
event.getJobIdentity().getGatewayId());
+                        } catch (TException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }
+                }
+            });
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error reading airavata server properties", e);
+        }catch (Exception e) {
+           logger.error(e.getMessage(), e);
+        }
+
+    }
+
+    public static void parseArguments(String[] args) {
+        try{
+            Options options = new Options();
+
+            options.addOption("gId", true , "Gateway ID");
+            options.addOption("eId", true, "Experiment ID");
+            options.addOption("jId", true, "Job ID");
+            options.addOption("a", false, "All Notifications");
+
+            CommandLineParser parser = new PosixParser();
+            CommandLine cmd = parser.parse( options, args);
+            if (cmd.getOptions() == null || cmd.getOptions().length == 0){
+                logger.info("You have not specified any options. We assume you 
need to listen to all the messages...");
+                allMessages = true;
+                gatewayId = "*";
+            }
+            if (cmd.hasOption("a")){
+                logger.info("Listening to all the messages...");
+                allMessages = true;
+                gatewayId = "*";
+            }else {
+                gatewayId = cmd.getOptionValue("gId");
+                if (gatewayId == null){
+                    gatewayId = "*";
+                    logger.info("You have not specified a gateway id. We 
assume you need to listen to all the messages...");
+                } else {
+                    gatewayLevelMessages = true;
+                }
+                experimentId = cmd.getOptionValue("eId");
+                if (experimentId == null && !gatewayId.equals("*")){
+                    experimentId = "*";
+                    logger.info("You have not specified a experiment id. We 
assume you need to listen to all the messages for the gateway with id " + 
gatewayId);
+                } else if (experimentId == null && gatewayId.equals("*")) {
+                    experimentId = "*";
+                    logger.info("You have not specified a experiment id and a 
gateway id. We assume you need to listen to all the messages...");
+                }else {
+                    experimentLevelMessages = true;
+                }
+                jobId = cmd.getOptionValue("jId");
+                if (jobId == null && !gatewayId.equals("*") && 
!experimentId.equals("*")){
+                    jobId = "*";
+                    logger.info("You have not specified a job id. We assume 
you need to listen to all the messages for the gateway with id " + gatewayId
+                            + " with experiment id : " + experimentId );
+                } else if (jobId == null && gatewayId.equals("*") && 
experimentId.equals("*")) {
+                    jobId = "*";
+                    logger.info("You have not specified a job Id or experiment 
Id or a gateway Id. We assume you need to listen to all the messages...");
+                }else {
+                    jobLevelMessages = true;
+                }
+            }
+        } catch (ParseException e) {
+            logger.error("Error while reading command line parameters" , e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
index ff14a8c..97c0d98 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -64,22 +64,23 @@ public class RabbitMQPublisher implements Publisher {
             message.setMessageId(msgCtx.getMessageId());
             message.setMessageType(msgCtx.getType());
             message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+            String gatewayId = msgCtx.getGatewayId();
             String routingKey = null;
             if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
                 ExperimentStatusChangeEvent event = 
(ExperimentStatusChangeEvent) msgCtx.getEvent();
-                routingKey = event.getExperimentId();
+                routingKey = gatewayId + "." + event.getExperimentId();
             } else if (msgCtx.getType().equals(MessageType.TASK)) {
                 TaskStatusChangeEvent event = (TaskStatusChangeEvent) 
msgCtx.getEvent();
-                routingKey = event.getTaskIdentity().getExperimentId() + "." +
+                routingKey =  gatewayId + "." + 
event.getTaskIdentity().getExperimentId() + "." +
                         event.getTaskIdentity().getWorkflowNodeId() + "." + 
event.getTaskIdentity().getTaskId();
             }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
                 WorkflowNodeStatusChangeEvent event = 
(WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
                 WorkflowIdentifier workflowNodeIdentity = 
event.getWorkflowNodeIdentity();
-                routingKey = workflowNodeIdentity.getExperimentId() + "." + 
workflowNodeIdentity.getWorkflowNodeId();
+                routingKey =  gatewayId + "." + 
workflowNodeIdentity.getExperimentId() + "." + 
workflowNodeIdentity.getWorkflowNodeId();
             }else if (msgCtx.getType().equals(MessageType.JOB)){
                 JobStatusChangeEvent event = 
(JobStatusChangeEvent)msgCtx.getEvent();
                 JobIdentifier identity = event.getJobIdentity();
-                routingKey = identity.getExperimentId() + "." +
+                routingKey =  gatewayId + "." + identity.getExperimentId() + 
"." +
                         identity.getWorkflowNodeId() + "." +
                         identity.getTaskId() + "." +
                         identity.getJobId();

http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/modules/messaging/pom.xml b/modules/messaging/pom.xml
index e8237be..bfc5424 100644
--- a/modules/messaging/pom.xml
+++ b/modules/messaging/pom.xml
@@ -31,6 +31,7 @@
             </activation>
             <modules>
                 <module>core</module>
+                <module>client</module>
             </modules>
         </profile>
     </profiles>

Reply via email to