Fixed incompatible durable bits
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f29cfdb Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f29cfdb Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f29cfdb Branch: refs/heads/lahiru/AIRAVATA-2057 Commit: 3f29cfdbd71de18777557713dce58007a3cbc2f5 Parents: cfe62c3 Author: Shameera Rathnayaka <[email protected]> Authored: Mon Aug 15 16:18:01 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Aug 15 16:39:49 2016 -0400 ---------------------------------------------------------------------- .../org/apache/airavata/messaging/core/MessagingFactory.java | 6 +++--- .../apache/airavata/messaging/core/impl/RabbitMQPublisher.java | 2 +- .../airavata/messaging/core/impl/RabbitMQSubscriber.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/3f29cfdb/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java index 802ea5a..2d5cae1 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java @@ -48,19 +48,19 @@ public class MessagingFactory { case EXPERIMENT_LAUNCH: subscriber = getExperimentSubscriber(rProperties); subscriber.listen(((connection, channel) -> new ExperimentConsumer(messageHandler, connection, channel)), - null, + rProperties.getQueueName(), routingKeys); break; case PROCESS_LAUNCH: subscriber = getProcessSubscriber(rProperties); subscriber.listen((connection ,channel) -> new ProcessConsumer(messageHandler, connection, channel), - null, + rProperties.getQueueName(), routingKeys); break; case STATUS: subscriber = getStatusSubscriber(rProperties); subscriber.listen((connection, channel) -> new StatusConsumer(messageHandler, connection, channel), - null, + rProperties.getQueueName(), routingKeys); break; default: http://git-wip-us.apache.org/repos/asf/airavata/blob/3f29cfdb/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java index 6f1d1d8..188ded3 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -93,7 +93,7 @@ public class RabbitMQPublisher implements Publisher { message.setMessageType(messageContext.getType()); message.setUpdatedTime(messageContext.getUpdatedTime().getTime()); String routingKey = routingKeySupplier.apply(messageContext); - log.info("publish messageId:" + messageContext.getMessageId() + ", messageType:" + messageContext.getType() + ", to routingKey:" + routingKey); +// log.info("publish messageId:" + messageContext.getMessageId() + ", messageType:" + messageContext.getType() + ", to routingKey:" + routingKey); byte[] messageBody = ThriftUtils.serializeThriftObject(message); send(messageBody, routingKey); } catch (TException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/3f29cfdb/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java index 6b28723..2a7da56 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java @@ -64,7 +64,7 @@ public class RabbitMQSubscriber implements Subscriber { channel.basicQos(properties.getPrefetchCount()); channel.exchangeDeclare(properties.getExchangeName(), properties.getExchangeType(), - false); + true); // durable } catch (Exception e) { String msg = "could not open channel for exchange " + properties.getExchangeName(); log.error(msg); @@ -98,7 +98,7 @@ public class RabbitMQSubscriber implements Subscriber { } // bind all the routing keys for (String key : routingKeys) { - log.info("Binding key:" + key + " to queue:" + queueName); +// log.info("Binding key:" + key + " to queue:" + queueName); channel.queueBind(queueName, properties.getExchangeName(), key); }
