Refactor renamed RabbitMQListener class and fixed rename issue with RabbitMQStatusConsumer class
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/97ff3b7d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/97ff3b7d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/97ff3b7d Branch: refs/heads/master Commit: 97ff3b7d12ce1e6ac42e8a3d22c2bfc1de9b30a1 Parents: 249b440 Author: shamrath <[email protected]> Authored: Tue Feb 24 11:20:07 2015 -0500 Committer: shamrath <[email protected]> Committed: Tue Feb 24 11:20:07 2015 -0500 ---------------------------------------------------------------------- modules/messaging/client/pom.xml | 2 +- .../messaging/client/RabbitMQListener.java | 215 +++++++++++++++++++ .../messaging/client/RabbitMQListner.java | 215 ------------------- 3 files changed, 216 insertions(+), 216 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/97ff3b7d/modules/messaging/client/pom.xml ---------------------------------------------------------------------- diff --git a/modules/messaging/client/pom.xml b/modules/messaging/client/pom.xml index 762bd10..5db9d4d 100644 --- a/modules/messaging/client/pom.xml +++ b/modules/messaging/client/pom.xml @@ -80,7 +80,7 @@ <configuration> <archive> <manifest> - <mainClass>org.apache.airavata.messaging.client.RabbitMQListner</mainClass> + <mainClass>org.apache.airavata.messaging.client.RabbitMQListener</mainClass> </manifest> </archive> <descriptorRefs> http://git-wip-us.apache.org/repos/asf/airavata/blob/97ff3b7d/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java ---------------------------------------------------------------------- diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java new file mode 100644 index 0000000..c1bdc3d --- /dev/null +++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.client; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.model.messaging.event.*; +import org.apache.commons.cli.*; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class RabbitMQListener { + public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; + public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; + private final static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class); + private static String gatewayId = "*"; + private static boolean gatewayLevelMessages = false; + private static boolean experimentLevelMessages = false; + private static boolean jobLevelMessages = false; + private static String experimentId = "*"; + private static String jobId = "*"; + private static boolean allMessages = false; + + public static void main(String[] args) { + parseArguments(args); + try { + AiravataUtils.setExecutionAsServer(); + String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); + final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); + RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName); + consumer.listen(new MessageHandler() { + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + List<String> routingKeys = new ArrayList<String>(); + if (allMessages){ + routingKeys.add("*"); + routingKeys.add("*.*"); + routingKeys.add("*.*.*"); + routingKeys.add("*.*.*.*"); + routingKeys.add("*.*.*.*.*"); + }else { + if (gatewayLevelMessages){ + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + ".*"); + routingKeys.add(gatewayId + ".*.*"); + routingKeys.add(gatewayId + ".*.*.*"); + routingKeys.add(gatewayId + ".*.*.*.*"); + }else if (experimentLevelMessages){ + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*"); + }else if (jobLevelMessages){ + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId); + } + } + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; + } + + @Override + public void onMessage(MessageContext message) { + if (message.getType().equals(MessageType.EXPERIMENT)){ + try { + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + }else if (message.getType().equals(MessageType.WORKFLOWNODE)){ + try { + WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getWorkflowNodeIdentity().getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + }else if (message.getType().equals(MessageType.TASK)){ + try { + TaskStatusChangeEvent event = new TaskStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getTaskIdentity().getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + }else if (message.getType().equals(MessageType.JOB)){ + try { + JobStatusChangeEvent event = new JobStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + + " for Gateway " + event.getJobIdentity().getGatewayId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + } + } + }); + } catch (ApplicationSettingsException e) { + logger.error("Error reading airavata server properties", e); + }catch (Exception e) { + logger.error(e.getMessage(), e); + } + + } + + public static void parseArguments(String[] args) { + try{ + Options options = new Options(); + + options.addOption("gId", true , "Gateway ID"); + options.addOption("eId", true, "Experiment ID"); + options.addOption("jId", true, "Job ID"); + options.addOption("a", false, "All Notifications"); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse( options, args); + if (cmd.getOptions() == null || cmd.getOptions().length == 0){ + logger.info("You have not specified any options. We assume you need to listen to all the messages..."); + allMessages = true; + gatewayId = "*"; + } + if (cmd.hasOption("a")){ + logger.info("Listening to all the messages..."); + allMessages = true; + gatewayId = "*"; + }else { + gatewayId = cmd.getOptionValue("gId"); + if (gatewayId == null){ + gatewayId = "*"; + logger.info("You have not specified a gateway id. We assume you need to listen to all the messages..."); + } else { + gatewayLevelMessages = true; + } + experimentId = cmd.getOptionValue("eId"); + if (experimentId == null && !gatewayId.equals("*")){ + experimentId = "*"; + logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId); + } else if (experimentId == null && gatewayId.equals("*")) { + experimentId = "*"; + logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages..."); + }else { + experimentLevelMessages = true; + } + jobId = cmd.getOptionValue("jId"); + if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){ + jobId = "*"; + logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId + + " with experiment id : " + experimentId ); + } else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) { + jobId = "*"; + logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages..."); + }else { + jobLevelMessages = true; + } + } + } catch (ParseException e) { + logger.error("Error while reading command line parameters" , e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/97ff3b7d/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java ---------------------------------------------------------------------- diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java deleted file mode 100644 index 215d531..0000000 --- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.messaging.client; - -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessageHandler; -import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.impl.RabbitMQConsumer; -import org.apache.airavata.model.messaging.event.*; -import org.apache.commons.cli.*; -import org.apache.thrift.TBase; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -public class RabbitMQListner { - public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; - public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; - private final static Logger logger = LoggerFactory.getLogger(RabbitMQListner.class); - private static String gatewayId = "*"; - private static boolean gatewayLevelMessages = false; - private static boolean experimentLevelMessages = false; - private static boolean jobLevelMessages = false; - private static String experimentId = "*"; - private static String jobId = "*"; - private static boolean allMessages = false; - - public static void main(String[] args) { - parseArguments(args); - try { - AiravataUtils.setExecutionAsServer(); - String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); - final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); - RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName); - consumer.listen(new MessageHandler() { - @Override - public Map<String, Object> getProperties() { - Map<String, Object> props = new HashMap<String, Object>(); - List<String> routingKeys = new ArrayList<String>(); - if (allMessages){ - routingKeys.add("*"); - routingKeys.add("*.*"); - routingKeys.add("*.*.*"); - routingKeys.add("*.*.*.*"); - routingKeys.add("*.*.*.*.*"); - }else { - if (gatewayLevelMessages){ - routingKeys.add(gatewayId); - routingKeys.add(gatewayId + ".*"); - routingKeys.add(gatewayId + ".*.*"); - routingKeys.add(gatewayId + ".*.*.*"); - routingKeys.add(gatewayId + ".*.*.*.*"); - }else if (experimentLevelMessages){ - routingKeys.add(gatewayId); - routingKeys.add(gatewayId + "." + experimentId); - routingKeys.add(gatewayId + "." + experimentId+ ".*"); - routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); - routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*"); - }else if (jobLevelMessages){ - routingKeys.add(gatewayId); - routingKeys.add(gatewayId + "." + experimentId); - routingKeys.add(gatewayId + "." + experimentId+ ".*"); - routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); - routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId); - } - } - props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); - return props; - } - - @Override - public void onMessage(MessageContext message) { - if (message.getType().equals(MessageType.EXPERIMENT)){ - try { - ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(); - TBase messageEvent = message.getEvent(); - byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); - ThriftUtils.createThriftFromBytes(bytes, event); - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + - " for Gateway " + event.getGatewayId()); - } catch (TException e) { - logger.error(e.getMessage(), e); - } - }else if (message.getType().equals(MessageType.WORKFLOWNODE)){ - try { - WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(); - TBase messageEvent = message.getEvent(); - byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); - ThriftUtils.createThriftFromBytes(bytes, event); - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + - " for Gateway " + event.getWorkflowNodeIdentity().getGatewayId()); - } catch (TException e) { - logger.error(e.getMessage(), e); - } - }else if (message.getType().equals(MessageType.TASK)){ - try { - TaskStatusChangeEvent event = new TaskStatusChangeEvent(); - TBase messageEvent = message.getEvent(); - byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); - ThriftUtils.createThriftFromBytes(bytes, event); - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + - " for Gateway " + event.getTaskIdentity().getGatewayId()); - } catch (TException e) { - logger.error(e.getMessage(), e); - } - }else if (message.getType().equals(MessageType.JOB)){ - try { - JobStatusChangeEvent event = new JobStatusChangeEvent(); - TBase messageEvent = message.getEvent(); - byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); - ThriftUtils.createThriftFromBytes(bytes, event); - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() + - " for Gateway " + event.getJobIdentity().getGatewayId()); - } catch (TException e) { - logger.error(e.getMessage(), e); - } - } - } - }); - } catch (ApplicationSettingsException e) { - logger.error("Error reading airavata server properties", e); - }catch (Exception e) { - logger.error(e.getMessage(), e); - } - - } - - public static void parseArguments(String[] args) { - try{ - Options options = new Options(); - - options.addOption("gId", true , "Gateway ID"); - options.addOption("eId", true, "Experiment ID"); - options.addOption("jId", true, "Job ID"); - options.addOption("a", false, "All Notifications"); - - CommandLineParser parser = new PosixParser(); - CommandLine cmd = parser.parse( options, args); - if (cmd.getOptions() == null || cmd.getOptions().length == 0){ - logger.info("You have not specified any options. We assume you need to listen to all the messages..."); - allMessages = true; - gatewayId = "*"; - } - if (cmd.hasOption("a")){ - logger.info("Listening to all the messages..."); - allMessages = true; - gatewayId = "*"; - }else { - gatewayId = cmd.getOptionValue("gId"); - if (gatewayId == null){ - gatewayId = "*"; - logger.info("You have not specified a gateway id. We assume you need to listen to all the messages..."); - } else { - gatewayLevelMessages = true; - } - experimentId = cmd.getOptionValue("eId"); - if (experimentId == null && !gatewayId.equals("*")){ - experimentId = "*"; - logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId); - } else if (experimentId == null && gatewayId.equals("*")) { - experimentId = "*"; - logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages..."); - }else { - experimentLevelMessages = true; - } - jobId = cmd.getOptionValue("jId"); - if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){ - jobId = "*"; - logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId - + " with experiment id : " + experimentId ); - } else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) { - jobId = "*"; - logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages..."); - }else { - jobLevelMessages = true; - } - } - } catch (ParseException e) { - logger.error("Error while reading command line parameters" , e); - } - } -}
