Repository: airavata Updated Branches: refs/heads/master 6e49e375f -> 51f456d8c
adding more changes to messaging and changing xbaya to fit messaging Signed-off-by: Chathuri Wimalasena <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7e498ac5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7e498ac5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7e498ac5 Branch: refs/heads/master Commit: 7e498ac54b5e135311b60923fad80247f04470f9 Parents: 9aeed4d Author: Supun <[email protected]> Authored: Thu Oct 16 10:06:00 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Thu Oct 16 10:25:52 2014 -0400 ---------------------------------------------------------------------- .../airavata/messaging/core/MessageHandler.java | 2 +- .../messaging/core/MessagingConstants.java | 3 + .../airavata/messaging/core/TestClient.java | 15 +- .../messaging/core/impl/RabbitMQConsumer.java | 131 +++++++---- .../messaging/core/impl/RabbitMQProducer.java | 2 +- .../messaging/core/impl/RabbitMQPublisher.java | 8 +- .../airavata/xbaya/messaging/EventData.java | 44 ++-- .../xbaya/messaging/EventDataRepository.java | 8 +- .../airavata/xbaya/messaging/MessageClient.java | 220 ------------------- .../airavata/xbaya/messaging/Monitor.java | 127 +++++------ 10 files changed, 189 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java index 8b897c5..f18715a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java @@ -3,7 +3,7 @@ package org.apache.airavata.messaging.core; import java.util.Map; public interface MessageHandler { - Map<String, String> getProperties(); + Map<String, Object> getProperties(); void onMessage(MessageContext message); } http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java index fa0946a..7bfdb09 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java @@ -1,6 +1,9 @@ package org.apache.airavata.messaging.core; public abstract class MessagingConstants { + public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; + public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; + public static final String RABBIT_ROUTING_KEY = "routingKey"; public static final String RABBIT_QUEUE= "queue"; public static final String RABBIT_CONSUMER_TAG = "consumerTag"; http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java index 4de9aba..991b85b 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java @@ -29,7 +29,9 @@ import org.apache.airavata.model.messaging.event.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -37,19 +39,22 @@ public class TestClient { public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; private final static Logger logger = LoggerFactory.getLogger(TestClient.class); - private final static String experimentId = "echoExperiment_cc733586-2bf8-4ee2-8a25-6521db135e7f"; + private final static String experimentId = "echoExperiment_cc733586-2bf8-4ee2-8a25-6521db135e7f.*"; public static void main(String[] args) { try { AiravataUtils.setExecutionAsServer(); String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); - String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); + final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName); consumer.listen(new MessageHandler() { @Override - public Map<String, String> getProperties() { - Map<String, String> props = new HashMap<String, String>(); - props.put(MessagingConstants.RABBIT_ROUTING_KEY, experimentId); + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + List<String> routingKeys = new ArrayList<String>(); + routingKeys.add(experimentId); + routingKeys.add(experimentId + ".*.*"); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); return props; } http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java index f3c1942..22d6317 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java @@ -24,6 +24,8 @@ package org.apache.airavata.messaging.core.impl; import com.rabbitmq.client.*; import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.Consumer; import org.apache.airavata.messaging.core.MessageContext; @@ -36,7 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class RabbitMQConsumer implements Consumer { @@ -48,15 +52,40 @@ public class RabbitMQConsumer implements Consumer { private Channel channel; private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>(); + public RabbitMQConsumer() throws AiravataException { + try { + url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); + + createConnection(); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + } + public RabbitMQConsumer(String brokerUrl, String exchangeName) throws AiravataException { this.exchangeName = exchangeName; this.url = brokerUrl; + createConnection(); + } + + private void createConnection() throws AiravataException { try { - connection = createConnection(); + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(url); + connection = connectionFactory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + } + }); + log.info("connected to rabbitmq: " + connection + " for " + exchangeName); + channel = connection.createChannel(); + channel.exchangeDeclare(exchangeName, "topic", false); - channel.exchangeDeclare(exchangeName, "direct", false); } catch (Exception e) { String msg = "could not open channel for exchange " + exchangeName; log.error(msg); @@ -66,24 +95,44 @@ public class RabbitMQConsumer implements Consumer { public String listen(final MessageHandler handler) throws AiravataException { try { - Map<String, String> props = handler.getProperties(); - final String routingKey = props.get(MessagingConstants.RABBIT_ROUTING_KEY); - if (routingKey == null) { + Map<String, Object> props = handler.getProperties(); + final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); + if (routing == null) { throw new IllegalArgumentException("The routing key must be present"); } - String queueName = props.get(MessagingConstants.RABBIT_QUEUE); - String consumerTag = props.get(MessagingConstants.RABBIT_CONSUMER_TAG); + List<String> keys = new ArrayList<String>(); + if (routing instanceof List) { + for (Object o : (List)routing) { + keys.add(o.toString()); + } + } else if (routing instanceof String) { + keys.add((String) routing); + } + + String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); + String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); if (queueName == null) { queueName = channel.queueDeclare().getQueue(); } else { channel.queueDeclare(queueName, true, false, false, null); } + + final String id = getId(keys, queueName); + if (queueDetailsMap.containsKey(id)) { + throw new IllegalStateException("This subscriber is already defined for this Consumer, " + + "cannot define the same subscriber twice"); + } + if (consumerTag == null) { consumerTag = "default"; } - String id = routingKey + "." + queueName; - channel.queueBind(queueName, exchangeName, routingKey); + + // bind all the routing keys + for (String routingKey : keys) { + channel.queueBind(queueName, exchangeName, routingKey); + } + channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, @@ -99,38 +148,42 @@ public class RabbitMQConsumer implements Consumer { ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent); log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + experimentStatusChangeEvent.getState()); + + "' and with message type '" + message.getMessageType() + "' with status " + + experimentStatusChangeEvent.getState()); event = experimentStatusChangeEvent; } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)) { WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent); log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + wfnStatusChangeEvent.getState()); + + "' and with message type '" + message.getMessageType() + "' with status " + + wfnStatusChangeEvent.getState()); event = wfnStatusChangeEvent; } else if (message.getMessageType().equals(MessageType.TASK)) { TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent); log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + taskStatusChangeEvent.getState()); + + "' and with message type '" + message.getMessageType() + "' with status " + + taskStatusChangeEvent.getState()); event = taskStatusChangeEvent; } else if (message.getMessageType().equals(MessageType.JOB)) { JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent); log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + jobStatusChangeEvent.getState()); + + "' and with message type '" + message.getMessageType() + "' with status " + + jobStatusChangeEvent.getState()); event = jobStatusChangeEvent; } MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId()); handler.onMessage(messageContext); } catch (TException e) { - String msg = "Failed to de-serialize the thrift message, exchange: " + exchangeName + " routingKey: " + routingKey; + String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; log.warn(msg, e); } } }); // save the name for deleting the queue - queueDetailsMap.put(id, new QueueDetails(queueName, routingKey)); + queueDetailsMap.put(id, new QueueDetails(queueName, keys)); return id; } catch (Exception e) { String msg = "could not open channel for exchange " + exchangeName; @@ -143,7 +196,9 @@ public class RabbitMQConsumer implements Consumer { QueueDetails details = queueDetailsMap.get(id); if (details != null) { try { - channel.queueUnbind(details.getQueueName(), exchangeName, details.getRoutingKey()); + for (String key : details.getRoutingKeys()) { + channel.queueUnbind(details.getQueueName(), exchangeName, key); + } channel.queueDelete(details.getQueueName(), true, true); } catch (IOException e) { String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName; @@ -153,42 +208,42 @@ public class RabbitMQConsumer implements Consumer { } } - private Connection createConnection() throws IOException { - try { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setUri(url); - Connection connection = connectionFactory.newConnection(); - connection.addShutdownListener(new ShutdownListener() { - public void shutdownCompleted(ShutdownSignalException cause) { - } - }); - log.info("connected to rabbitmq: " + connection + " for " + exchangeName); - return connection; - } catch (Exception e) { - log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName); - return null; - } - } - /** * Private class for holding some information about the consumers registered */ private class QueueDetails { String queueName; - String routingKey; + List<String> routingKeys; - private QueueDetails(String queueName, String routingKey) { + private QueueDetails(String queueName, List<String> routingKeys) { this.queueName = queueName; - this.routingKey = routingKey; + this.routingKeys = routingKeys; } public String getQueueName() { return queueName; } - public String getRoutingKey() { - return routingKey; + public List<String> getRoutingKeys() { + return routingKeys; + } + } + + private String getId(List<String> routingKeys, String queueName) { + String id = ""; + for (String key : routingKeys) { + id = id + "_" + key; + } + return id + "_" + queueName; + } + + public void close() { + if (connection != null) { + try { + connection.close(); + } catch (IOException ignore) { + } } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java index a741a8d..b4a6d46 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java @@ -104,7 +104,7 @@ public class RabbitMQProducer { log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName); channel.basicQos(prefetchCount); } - channel.exchangeDeclare(exchangeName, "direct", false); + channel.exchangeDeclare(exchangeName, "topic", false); } catch (Exception e) { reset(); String msg = "could not open channel for exchange " + exchangeName; http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java index edbb28d..ff14a8c 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessagingConstants; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.messaging.event.*; import org.apache.thrift.TException; @@ -33,8 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RabbitMQPublisher implements Publisher { - public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; - public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; + private static Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class); private RabbitMQProducer rabbitMQProducer; @@ -44,8 +44,8 @@ public class RabbitMQPublisher implements Publisher { String brokerUrl; String exchangeName; try { - brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); + brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); } catch (ApplicationSettingsException e) { String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; log.error(message, e); http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java ---------------------------------------------------------------------- diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java index 9ef3331..ab02662 100644 --- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java +++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java @@ -21,7 +21,9 @@ package org.apache.airavata.xbaya.messaging; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; @@ -38,7 +40,7 @@ import java.util.Date; public class EventData { - private Message messageEvent; + private MessageContext messageEvent; private Date updateDate; @@ -47,7 +49,6 @@ public class EventData { private String workflowNodeId; private String message; - private MessageLevel messageLevel; private String messageId; /** @@ -55,35 +56,31 @@ public class EventData { * * @param event */ - public EventData(Message event) throws TException { + public EventData(MessageContext event) { this.messageEvent = event; process(event); } - private void process(Message event) throws TException { + private void process(MessageContext event) { this.messageId = event.getMessageId(); - this.messageLevel = event.getMessageLevel(); - if (event.getMessageType() == MessageType.EXPERIMENT) { - ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), experimentStatusChangeEvent); + if (event.getType() == MessageType.EXPERIMENT) { + ExperimentStatusChangeEvent experimentStatusChangeEvent = (ExperimentStatusChangeEvent) event.getEvent(); this.status = experimentStatusChangeEvent.getState().toString(); this.experimentId = experimentStatusChangeEvent.getExperimentId(); this.workflowNodeId = ""; this.message = "Received experiment event , expId : " + experimentStatusChangeEvent.getExperimentId() + ", status : " + experimentStatusChangeEvent.getState().toString(); - } else if (event.getMessageType() == MessageType.WORKFLOWNODE) { - WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), wfnStatusChangeEvent); + } else if (event.getType() == MessageType.WORKFLOWNODE) { + WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = (WorkflowNodeStatusChangeEvent) event.getEvent(); WorkflowIdentifier wfIdentifier = wfnStatusChangeEvent.getWorkflowNodeIdentity(); this.status = wfnStatusChangeEvent.getState().toString(); this.experimentId = wfIdentifier.getExperimentId(); this.workflowNodeId = wfIdentifier.getWorkflowNodeId(); - this.message = "Received workflow status change event, expId : " + wfIdentifier.getExperimentId() + + this.message = "Received workflow status change event, expId : " + wfIdentifier.getExperimentId() + ", nodeId : " + wfIdentifier.getWorkflowNodeId() + " , status : " + wfnStatusChangeEvent.getState().toString(); - }else if (event.getMessageType() == MessageType.TASK) { - TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), taskStatusChangeEvent); + } else if (event.getType() == MessageType.TASK) { + TaskStatusChangeEvent taskStatusChangeEvent = (TaskStatusChangeEvent) event.getEvent(); TaskIdentifier taskIdentifier = taskStatusChangeEvent.getTaskIdentity(); this.status = taskStatusChangeEvent.getState().toString(); this.experimentId = taskIdentifier.getExperimentId(); @@ -91,11 +88,10 @@ public class EventData { this.message = "Received task event , expId : " + taskIdentifier.getExperimentId() + ",taskId : " + taskIdentifier.getTaskId() + ", wfNodeId : " + taskIdentifier.getWorkflowNodeId() + ", status : " + taskStatusChangeEvent.getState().toString(); - } else if (event.getMessageType() == MessageType.JOB) { - JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), jobStatusChangeEvent); + } else if (event.getType() == MessageType.JOB) { + JobStatusChangeEvent jobStatusChangeEvent = (JobStatusChangeEvent) event.getEvent(); JobIdentifier jobIdentifier = jobStatusChangeEvent.getJobIdentity(); - this.status = jobStatusChangeEvent.getState().toString(); + this.status = jobStatusChangeEvent.getState().toString(); this.experimentId = jobIdentifier.getExperimentId(); this.workflowNodeId = jobIdentifier.getWorkflowNodeId(); this.message = "Received task event , expId : " + jobIdentifier.getExperimentId() + " ,taskId : " + @@ -109,7 +105,7 @@ public class EventData { * * @return The event */ - public Message getEvent() { + public MessageContext getEvent() { return this.messageEvent; } @@ -119,12 +115,12 @@ public class EventData { * @return The type */ public MessageType getType() { - return this.messageEvent.getMessageType(); + return this.messageEvent.getType(); } public Date getUpdateTime() { if (updateDate == null) { - updateDate = new Date(this.messageEvent.getUpdatedTime()); + updateDate = new Date(this.messageEvent.getUpdatedTime().getTime()); } return updateDate; } @@ -141,10 +137,6 @@ public class EventData { return message; } - public MessageLevel getMessageLevel() { - return messageLevel; - } - public String getMessageId() { return messageId; } http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java ---------------------------------------------------------------------- diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java index f0cc46e..1168925 100644 --- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java +++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java @@ -101,13 +101,7 @@ public class EventDataRepository implements TableModel, BoundedRangeModel { this.tableModelChangeEvent = new ChangeEvent(this); // We only need one. this.events = new ArrayList<EventData>(); } - public void addEvent(Message message) { - try { - addEvent(new EventData(message)); - } catch (TException e) { - logger.error("Error while adding new message event", e); - } - } + /** * @param event */ http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java ---------------------------------------------------------------------- diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java deleted file mode 100644 index d4e975f..0000000 --- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.xbaya.messaging; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.ShutdownListener; -import com.rabbitmq.client.ShutdownSignalException; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.model.messaging.event.Message; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class MessageClient { - - private static final Logger log = LoggerFactory.getLogger(MessageClient.class); - - private Monitor monitor; - - public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; - public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; - - private String brokerURL; - - private String exchangeName; - - private String subscriptionID; - - private long timeout = 20000L; - - private long interval = 1000L; - - private List<TerminateListener> terminateListeners = new ArrayList<TerminateListener>(); - - private static final Logger logger = LoggerFactory.getLogger(MessageClient.class); - private Connection connection; - private Channel channel; - private ExecutorService executorService; - - /** - * Constructs a MessageMonitore. - * - * @param monitor - */ - public MessageClient(Monitor monitor) { -// try { - this.monitor = monitor; - // We need to copy these because the configuration might change. -// this.brokerURL = ServerSettings.getSetting(RABBITMQ_BROKER_URL); - this.brokerURL = "amqp://127.0.0.1:5672"; -// this.exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); - this.exchangeName = "airavata_rabbitmq_exchange"; - executorService = Executors.newFixedThreadPool(25); - init(); -// } catch (ApplicationSettingsException e) { -// logger.error("Exception while initiating monitoring client "); -// } - } - - private void init() { - try { - connection = createConnection(); - channel = connection.createChannel(); - channel.exchangeDeclare(exchangeName, "direct", false); - } catch (IOException e) { - log.error("Error occur while Client initiating", e); - } - } - - private Connection createConnection() { - try { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setUri(brokerURL); - Connection connection = connectionFactory.newConnection(); - connection.addShutdownListener(new ShutdownListener() { - public void shutdownCompleted(ShutdownSignalException cause) { - log.info("Connection shutdown listener triggered -----------"); - } - }); - log.info("connected to rabbitmq: " + connection + " for " + exchangeName); - return connection; - } catch (Exception e) { - log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName); - return null; - } - } - - /** - * Subscribes to the notification. - * - * @throws MonitorException - */ - public synchronized void subscribe(String experimentId){ - try { - String queueName = channel.queueDeclare().getQueue(); - System.out.println("Experiment ID is : " + experimentId); - channel.queueBind(queueName, exchangeName, experimentId ); // send experiment Id as routing Key - QueueingConsumer consumer = new QueueingConsumer(channel); - channel.basicConsume(queueName, true, consumer); - executorService.execute(new Thread(new RabbitMQConsumer(consumer, experimentId))); - } catch (IOException e) { - log.error("Error while subscribe to routing key : " + experimentId, e); - } - - } - - /** - * Unsubscribes from the notification. - * - * @throws MonitorException - */ - public synchronized void unsubscribe(String experimentId) { - // This method needs to be synchronized along with subscribe() because - // unsubscribe() might be called while subscribe() is being executed. - // TODO - implement this, after experiment execution complete we need to unsubscribe it. - notifyTerminateListeners(experimentId); - } - - private void notifyTerminateListeners(String experimentId) { - for (TerminateListener terminateListener : terminateListeners) { - terminateListener.terminate(experimentId); - } - } - - private void registerTerminateListener(TerminateListener terminateListener) { - terminateListeners.add(terminateListener); - } - - public long getTimeout() { - return timeout; - } - - public void setTimeout(long timeout) { - this.timeout = timeout; - } - - public long getInterval() { - return interval; - } - - public void setInterval(long interval) { - this.interval = interval; - } - - private interface TerminateListener { - public void terminate(String experimentId); - } - private class RabbitMQConsumer implements Runnable, TerminateListener { - private final Logger logger = LoggerFactory.getLogger(MessageClient.RabbitMQConsumer.class); - private final String id; - private QueueingConsumer consumer; - private boolean isContinue = true; - RabbitMQConsumer(QueueingConsumer consumer, String experimentId) { - this.consumer = consumer; - this.id = experimentId; - registerTerminateListener(this); - } - - @Override - public void run() { - System.out.println("RabbitMQConsumer started for experiment " + consumer.getConsumerTag()); - try { - Message message; - while (isContinue) { - QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000); - if (delivery == null) { - continue; - } - byte[] body = delivery.getBody(); - message = new Message(); - ThriftUtils.createThriftFromBytes(body, message); - monitor.handleNotification(message); - } - System.out.println("Terminating consumer for experimentId : " + id); - } catch (InterruptedException e) { - logger.error("Error while consuming next delivery", e); - System.out.println("Error while consuming next delivery"); - } catch (TException e) { - logger.error("Error while creating message from thrift", e); - System.out.println("Error while creating message from thrift"); - } - } - - @Override - public void terminate(String experimentId) { - if (id.equals(experimentId)) { - System.out.println("Terminate request came for " + experimentId); - isContinue = false; - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java ---------------------------------------------------------------------- diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java index 4c6c667..b3e7ff4 100644 --- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java +++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java @@ -21,7 +21,13 @@ package org.apache.airavata.xbaya.messaging; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.Consumer; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.impl.RabbitMQConsumer; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; @@ -39,10 +45,7 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; +import java.util.*; //import org.xmlpull.infoset.XmlElement; public class Monitor extends EventProducer { @@ -53,7 +56,7 @@ public class Monitor extends EventProducer { protected Map<String, EventDataRepository> eventDataMap = new HashMap<String, EventDataRepository>(); - protected MessageClient messageClient; + protected Consumer messageClient; protected boolean printRawMessages; @@ -67,6 +70,8 @@ public class Monitor extends EventProducer { private String lastTerminatedWorkflowExecutionId=null; + private Map<String, String> expIdToSubscribers = new HashMap<String, String>(); + public Monitor() { // First one keeps all event data & it doesn't have filters this.eventDataMap.put(DEFAULT_MODEL_KEY, new EventDataRepository()); @@ -103,7 +108,13 @@ public class Monitor extends EventProducer { //Notify listeners that the monitoring is about to start getEventDataRepository().triggerListenerForPreMonitorStart(); - this.messageClient = new MessageClient(this); + try { + this.messageClient = new RabbitMQConsumer(); + } catch (AiravataException e) { + String msg = "Failed to start the consumer"; + logger.error(msg, e); + throw new MonitorException(msg, e); + } setMonitoring(true); // Enable/disable some menu items and show the monitor panel. sendSafeEvent(new Event(Event.Type.MONITOR_STARTED)); @@ -179,71 +190,35 @@ public class Monitor extends EventProducer { for (String key : keysToBeRemoved) { this.eventDataMap.remove(key); } - } - /** - * @param event - */ - protected synchronized void handleNotification(Message event) { - EventData eventData = null; - boolean unsubscribeConsumer = false; - try { - eventData = new EventData(event); - } catch (TException e) { - logger.error("Error while adding new message event", e); - System.out.println("Error while adding new message event"); - return; - } - Set<String> keys = this.eventDataMap.keySet(); - // Remove everthing leaving only the last one - if(printRawMessages) { - try { - if (event.getMessageType() == MessageType.EXPERIMENT) { - ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), experimentStatusChangeEvent); - logger.info("Received experiment event , expId : {} , status : {} ", - experimentStatusChangeEvent.getExperimentId(), experimentStatusChangeEvent.getState().toString()); - System.out.println("Received experiment event"); - - } else if (event.getMessageType() == MessageType.WORKFLOWNODE) { - WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), wfnStatusChangeEvent); - WorkflowIdentifier wfIdentifier = wfnStatusChangeEvent.getWorkflowNodeIdentity(); - logger.info("Received workflow status change event, expId : {}, nodeId : {}, status : {} ", - new String[]{wfIdentifier.getExperimentId(), wfIdentifier.getWorkflowNodeId(), - wfnStatusChangeEvent.getState().toString()}); - System.out.println("Received a workflow change event"); - }else if (event.getMessageType() == MessageType.TASK) { - TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), taskStatusChangeEvent); - TaskIdentifier taskIdentifier = taskStatusChangeEvent.getTaskIdentity(); - logger.info("Received task event , expId : {} ,taskId : {}, wfNodeId : {}, status : {} ", - new String[]{taskIdentifier.getExperimentId(), taskIdentifier.getTaskId(), - taskIdentifier.getWorkflowNodeId(), taskStatusChangeEvent.getState().toString()}); - System.out.printf("Received a task change event"); - } else if (event.getMessageType() == MessageType.JOB) { - JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(event.getEvent(), jobStatusChangeEvent); - JobIdentifier jobIdentifier = jobStatusChangeEvent.getJobIdentity(); - logger.info("Received job event , expId : {}, taskId : {}, jobId : {}, wfNodeId : {}, status : {} ", - new String[]{jobIdentifier.getExperimentId(), jobIdentifier.getTaskId(), jobIdentifier.getJobId(), - jobIdentifier.getWorkflowNodeId(), jobStatusChangeEvent.getState().toString()}); - System.out.println("Received a job change event"); - } else { - logger.info("Received UNKNOWN event"); - System.out.println("Received an UNKOWN event"); - } - } catch (TException e) { - logger.error("Error while printing thrift message "); - System.out.println("Error while printing thrift message"); - } + private class NotificationMessageHandler implements MessageHandler { + private String experimentId; + + private NotificationMessageHandler(String experimentId) { + this.experimentId = experimentId; } - for (String key : keys) { - this.eventDataMap.get(key).addEvent(eventData); + + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + List<String> routingKeys = new ArrayList<String>(); + routingKeys.add(experimentId); + routingKeys.add(experimentId + ".*.*"); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; } - if (eventData.getType() == MessageType.EXPERIMENT && eventData.getStatus().equals(ExperimentState.LAUNCHED.toString())) { - unsubscribe(eventData.getExperimentId()); + + public void onMessage(MessageContext message) { + EventData eventData = null; + boolean unsubscribeConsumer = false; + eventData = new EventData(message); + Set<String> keys = eventDataMap.keySet(); + for (String key : keys) { + eventDataMap.get(key).addEvent(eventData); + } + if (eventData.getType() == MessageType.EXPERIMENT && eventData.getStatus().equals(ExperimentState.LAUNCHED.toString())) { + unsubscribe(eventData.getExperimentId()); + } } } @@ -252,7 +227,14 @@ public class Monitor extends EventProducer { * @throws MonitorException */ public void subscribe(String experimentID) throws MonitorException { - messageClient.subscribe(experimentID); + try { + String id = messageClient.listen(new NotificationMessageHandler(experimentID)); + expIdToSubscribers.put(experimentID, id); + } catch (AiravataException e) { + String msg = "Failed to listen to experiment: " + experimentID; + logger.error(msg); + throw new MonitorException(msg, e); + } } /** @@ -262,7 +244,14 @@ public class Monitor extends EventProducer { public void unsubscribe(String experimentId){ // Enable/disable some menu items. sendSafeEvent(new Event(Event.Type.MONITOR_STOPED)); - messageClient.unsubscribe(experimentId); + String id = expIdToSubscribers.remove(experimentId); + if (id != null) { + try { + messageClient.stopListen(experimentId); + } catch (AiravataException e) { + logger.warn("Failed to find the subscriber for experiment id: " + id, e); + } + } setMonitoring(false); }
