Fixed incompatible durable bits

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f29cfdb
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f29cfdb
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f29cfdb

Branch: refs/heads/lahiru/AIRAVATA-2057
Commit: 3f29cfdbd71de18777557713dce58007a3cbc2f5
Parents: cfe62c3
Author: Shameera Rathnayaka <[email protected]>
Authored: Mon Aug 15 16:18:01 2016 -0400
Committer: Shameera Rathnayaka <[email protected]>
Committed: Mon Aug 15 16:39:49 2016 -0400

----------------------------------------------------------------------
 .../org/apache/airavata/messaging/core/MessagingFactory.java   | 6 +++---
 .../apache/airavata/messaging/core/impl/RabbitMQPublisher.java | 2 +-
 .../airavata/messaging/core/impl/RabbitMQSubscriber.java       | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/3f29cfdb/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
index 802ea5a..2d5cae1 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -48,19 +48,19 @@ public class MessagingFactory {
             case EXPERIMENT_LAUNCH:
                 subscriber = getExperimentSubscriber(rProperties);
                 subscriber.listen(((connection, channel) -> new 
ExperimentConsumer(messageHandler, connection, channel)),
-                        null,
+                        rProperties.getQueueName(),
                         routingKeys);
                 break;
             case PROCESS_LAUNCH:
                 subscriber = getProcessSubscriber(rProperties);
                 subscriber.listen((connection ,channel) -> new 
ProcessConsumer(messageHandler, connection, channel),
-                        null,
+                        rProperties.getQueueName(),
                         routingKeys);
                 break;
             case STATUS:
                 subscriber = getStatusSubscriber(rProperties);
                 subscriber.listen((connection, channel) -> new 
StatusConsumer(messageHandler, connection, channel),
-                        null,
+                        rProperties.getQueueName(),
                         routingKeys);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f29cfdb/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
index 6f1d1d8..188ded3 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -93,7 +93,7 @@ public class RabbitMQPublisher implements Publisher {
             message.setMessageType(messageContext.getType());
             message.setUpdatedTime(messageContext.getUpdatedTime().getTime());
             String routingKey = routingKeySupplier.apply(messageContext);
-            log.info("publish messageId:" + messageContext.getMessageId() + ", 
messageType:" + messageContext.getType() + ", to routingKey:" + routingKey);
+//            log.info("publish messageId:" + messageContext.getMessageId() + 
", messageType:" + messageContext.getType() + ", to routingKey:" + routingKey);
             byte[] messageBody = ThriftUtils.serializeThriftObject(message);
             send(messageBody, routingKey);
         } catch (TException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f29cfdb/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
index 6b28723..2a7da56 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
@@ -64,7 +64,7 @@ public class RabbitMQSubscriber implements Subscriber {
             channel.basicQos(properties.getPrefetchCount());
             channel.exchangeDeclare(properties.getExchangeName(),
                     properties.getExchangeType(),
-                    false);
+                    true); // durable
         } catch (Exception e) {
             String msg = "could not open channel for exchange " + 
properties.getExchangeName();
             log.error(msg);
@@ -98,7 +98,7 @@ public class RabbitMQSubscriber implements Subscriber {
             }
             // bind all the routing keys
             for (String key : routingKeys) {
-                log.info("Binding key:" + key + " to queue:" + queueName);
+//                log.info("Binding key:" + key + " to queue:" + queueName);
                 channel.queueBind(queueName, properties.getExchangeName(), 
key);
             }
 

Reply via email to