Repository: airavata Updated Branches: refs/heads/master e68039748 -> 042209406
adding rabbitMQ java client - AIRAVATA-1574 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/04220940 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/04220940 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/04220940 Branch: refs/heads/master Commit: 042209406ecacb00d88c610dd8717aa12e7aa33c Parents: e680397 Author: Chathuri Wimalasena <[email protected]> Authored: Thu Feb 12 14:50:00 2015 -0500 Committer: Chathuri Wimalasena <[email protected]> Committed: Thu Feb 12 14:50:00 2015 -0500 ---------------------------------------------------------------------- modules/messaging/client/README | 15 ++ modules/messaging/client/pom.xml | 103 +++++++++ .../messaging/client/RabbitMQListner.java | 215 +++++++++++++++++++ .../messaging/core/impl/RabbitMQPublisher.java | 9 +- modules/messaging/pom.xml | 1 + 5 files changed, 339 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/client/README ---------------------------------------------------------------------- diff --git a/modules/messaging/client/README b/modules/messaging/client/README new file mode 100644 index 0000000..86a9f1a --- /dev/null +++ b/modules/messaging/client/README @@ -0,0 +1,15 @@ +How to run +======================== + +1. Copy airavata-messaging-client-0.15-SNAPSHOT-jar-with-dependencies.jar to a location +2. Create a folder called "resources" in the same location and copy airavata-server.properties with updated broker url in to the resources folder. +3. To run the jar, in the command line type : + java -Xmx1024m -jar airavata-messaging-client-0.15-SNAPSHOT-jar-with-dependencies.jar -a + + You can provide different parameters. + -a = to listen to all the messages + -gId = to listen to all the messages for given gateway Id + -eId = to listen to all the messages for given experiment Id + -jId = to listen to all the messages for given job Id + + http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/client/pom.xml ---------------------------------------------------------------------- diff --git a/modules/messaging/client/pom.xml b/modules/messaging/client/pom.xml new file mode 100644 index 0000000..762bd10 --- /dev/null +++ b/modules/messaging/client/pom.xml @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file + distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under + the Apache License, Version 2.0 (theà "License"); you may not use this file except in compliance with the License. You may + obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to + in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF + ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under + the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <parent> + <groupId>org.apache.airavata</groupId> + <artifactId>messaging</artifactId> + <version>0.15-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>airavata-messaging-client</artifactId> + <packaging>jar</packaging> + <name>Airavata Messaging Client</name> + <url>http://airavata.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-messaging-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-data-models</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-server-configuration</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>3.3.5</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.airavata.messaging.client.RabbitMQListner</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java ---------------------------------------------------------------------- diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java new file mode 100644 index 0000000..215d531 --- /dev/null +++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.client; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.impl.RabbitMQConsumer; +import org.apache.airavata.model.messaging.event.*; +import org.apache.commons.cli.*; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class RabbitMQListner { + public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; + public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; + private final static Logger logger = LoggerFactory.getLogger(RabbitMQListner.class); + private static String gatewayId = "*"; + private static boolean gatewayLevelMessages = false; + private static boolean experimentLevelMessages = false; + private static boolean jobLevelMessages = false; + private static String experimentId = "*"; + private static String jobId = "*"; + private static boolean allMessages = false; + + public static void main(String[] args) { + parseArguments(args); + try { + AiravataUtils.setExecutionAsServer(); + String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); + final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); + RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName); + consumer.listen(new MessageHandler() { + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + List<String> routingKeys = new ArrayList<String>(); + if (allMessages){ + routingKeys.add("*"); + routingKeys.add("*.*"); + routingKeys.add("*.*.*"); + routingKeys.add("*.*.*.*"); + routingKeys.add("*.*.*.*.*"); + }else { + if (gatewayLevelMessages){ + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + ".*"); + routingKeys.add(gatewayId + ".*.*"); + routingKeys.add(gatewayId + ".*.*.*"); + routingKeys.add(gatewayId + ".*.*.*.*"); + }else if (experimentLevelMessages){ + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*"); + }else if (jobLevelMessages){ + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId); + } + } + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; + } + + @Override + public void onMessage(MessageContext message) { + if (message.getType().equals(MessageType.EXPERIMENT)){ + try { + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + }else if (message.getType().equals(MessageType.WORKFLOWNODE)){ + try { + WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getWorkflowNodeIdentity().getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + }else if (message.getType().equals(MessageType.TASK)){ + try { + TaskStatusChangeEvent event = new TaskStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getTaskIdentity().getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + }else if (message.getType().equals(MessageType.JOB)){ + try { + JobStatusChangeEvent event = new JobStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getJobIdentity().getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + } + } + }); + } catch (ApplicationSettingsException e) { + logger.error("Error reading airavata server properties", e); + }catch (Exception e) { + logger.error(e.getMessage(), e); + } + + } + + public static void parseArguments(String[] args) { + try{ + Options options = new Options(); + + options.addOption("gId", true , "Gateway ID"); + options.addOption("eId", true, "Experiment ID"); + options.addOption("jId", true, "Job ID"); + options.addOption("a", false, "All Notifications"); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse( options, args); + if (cmd.getOptions() == null || cmd.getOptions().length == 0){ + logger.info("You have not specified any options. We assume you need to listen to all the messages..."); + allMessages = true; + gatewayId = "*"; + } + if (cmd.hasOption("a")){ + logger.info("Listening to all the messages..."); + allMessages = true; + gatewayId = "*"; + }else { + gatewayId = cmd.getOptionValue("gId"); + if (gatewayId == null){ + gatewayId = "*"; + logger.info("You have not specified a gateway id. We assume you need to listen to all the messages..."); + } else { + gatewayLevelMessages = true; + } + experimentId = cmd.getOptionValue("eId"); + if (experimentId == null && !gatewayId.equals("*")){ + experimentId = "*"; + logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId); + } else if (experimentId == null && gatewayId.equals("*")) { + experimentId = "*"; + logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages..."); + }else { + experimentLevelMessages = true; + } + jobId = cmd.getOptionValue("jId"); + if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){ + jobId = "*"; + logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId + + " with experiment id : " + experimentId ); + } else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) { + jobId = "*"; + logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages..."); + }else { + jobLevelMessages = true; + } + } + } catch (ParseException e) { + logger.error("Error while reading command line parameters" , e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java index ff14a8c..97c0d98 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -64,22 +64,23 @@ public class RabbitMQPublisher implements Publisher { message.setMessageId(msgCtx.getMessageId()); message.setMessageType(msgCtx.getType()); message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); + String gatewayId = msgCtx.getGatewayId(); String routingKey = null; if (msgCtx.getType().equals(MessageType.EXPERIMENT)){ ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getExperimentId(); + routingKey = gatewayId + "." + event.getExperimentId(); } else if (msgCtx.getType().equals(MessageType.TASK)) { TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getTaskIdentity().getExperimentId() + "." + + routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity(); - routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); + routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); }else if (msgCtx.getType().equals(MessageType.JOB)){ JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); JobIdentifier identity = event.getJobIdentity(); - routingKey = identity.getExperimentId() + "." + + routingKey = gatewayId + "." + identity.getExperimentId() + "." + identity.getWorkflowNodeId() + "." + identity.getTaskId() + "." + identity.getJobId(); http://git-wip-us.apache.org/repos/asf/airavata/blob/04220940/modules/messaging/pom.xml ---------------------------------------------------------------------- diff --git a/modules/messaging/pom.xml b/modules/messaging/pom.xml index e8237be..bfc5424 100644 --- a/modules/messaging/pom.xml +++ b/modules/messaging/pom.xml @@ -31,6 +31,7 @@ </activation> <modules> <module>core</module> + <module>client</module> </modules> </profile> </profiles>
