Fixed messaging publishing issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9f979b50 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9f979b50 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9f979b50 Branch: refs/heads/lahiru/AIRAVATA-2057 Commit: 9f979b50b972db48b0d80bcdbbe3dd932c9a0bc4 Parents: 3fcde52 Author: Shameera Rathnayaka <[email protected]> Authored: Mon Aug 15 15:45:12 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Aug 15 15:45:12 2016 -0400 ---------------------------------------------------------------------- .../apache/airavata/gfac/server/GfacServerHandler.java | 2 +- .../apache/airavata/messaging/core/MessagingFactory.java | 3 ++- .../airavata/messaging/core/impl/ExperimentConsumer.java | 2 +- .../airavata/messaging/core/impl/ProcessConsumer.java | 10 ++++------ .../airavata/messaging/core/impl/RabbitMQPublisher.java | 3 ++- .../airavata/messaging/core/impl/RabbitMQSubscriber.java | 7 ++++++- 6 files changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 44073dc..a7b0714 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -97,7 +97,7 @@ public class GfacServerHandler implements GfacService.Iface { private void initAMQPClient() throws AiravataException { // init process consumer List<String> routingKeys = new ArrayList<>(); - routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName()); + routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName()); processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Type.PROCESS_LAUNCH); // init status publisher statusPublisher = Factory.getStatusPublisher(); http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java index b3e6d35..802ea5a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java @@ -123,7 +123,7 @@ public class MessagingFactory { private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException { sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName()) - .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName()) + .setQueueName("process_launch") .setAutoAck(false); return new RabbitMQSubscriber(sp); } @@ -131,6 +131,7 @@ public class MessagingFactory { private static Subscriber getExperimentSubscriber(RabbitMQProperties sp) throws AiravataException { sp.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName()) + .setQueueName("experiment_launch") .setAutoAck(false); return new RabbitMQSubscriber(sp); http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java index 6e4c46a..5010358 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java @@ -72,7 +72,7 @@ public class ExperimentConsumer extends QueueingConsumer { String gatewayId = null; ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent); - log.debug(" Message Received with message id '" + message.getMessageId() + log.info(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "' for experimentId:" + " " + experimentEvent.getExperimentId()); http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java index e95a7ca..69910bd 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java @@ -55,8 +55,7 @@ public class ProcessConsumer extends QueueingConsumer{ } - @Override - public void handleDelivery(String consumerTag, + @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { @@ -71,10 +70,9 @@ public class ProcessConsumer extends QueueingConsumer{ if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) { ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' for experimentId:" + - " " + - processSubmitEvent.getProcessId()); + log.info(" Message Received with message id '" + message.getMessageId() + + " and with message type:" + message.getMessageType() + ", for processId:" + + processSubmitEvent.getProcessId() + ", expId:" + processSubmitEvent.getExperimentId()); event = processSubmitEvent; gatewayId = processSubmitEvent.getGatewayId(); MessageContext messageContext = new MessageContext(event, message.getMessageType(), http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java index 3fdb3a1..6f1d1d8 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -71,7 +71,7 @@ public class RabbitMQPublisher implements Publisher { if (properties.getExchangeName() != null) { channel.exchangeDeclare(properties.getExchangeName(), properties.getExchangeType(), - false); + true); //durable } } catch (Exception e) { String msg = "RabbitMQ connection issue for exchange : " + properties.getExchangeName(); @@ -93,6 +93,7 @@ public class RabbitMQPublisher implements Publisher { message.setMessageType(messageContext.getType()); message.setUpdatedTime(messageContext.getUpdatedTime().getTime()); String routingKey = routingKeySupplier.apply(messageContext); + log.info("publish messageId:" + messageContext.getMessageId() + ", messageType:" + messageContext.getType() + ", to routingKey:" + routingKey); byte[] messageBody = ThriftUtils.serializeThriftObject(message); send(messageBody, routingKey); } catch (TException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java index 441281d..6b28723 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java @@ -85,7 +85,11 @@ public class RabbitMQSubscriber implements Subscriber { if (queueName == null) { queueName = channel.queueDeclare().getQueue(); } else { - channel.queueDeclare(queueName, true, false, false, null); + channel.queueDeclare(queueName, + true, // durable + false, // exclusive + false, // autoDelete + null);// arguments } final String id = getId(routingKeys, queueName); if (queueDetailMap.containsKey(id)) { @@ -94,6 +98,7 @@ public class RabbitMQSubscriber implements Subscriber { } // bind all the routing keys for (String key : routingKeys) { + log.info("Binding key:" + key + " to queue:" + queueName); channel.queueBind(queueName, properties.getExchangeName(), key); }
