fixing a deadlock scenario - in the single JVM case- each message delegator 
should have its own message queue and also each message receiver should have 
its own message listener


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2ffbaa3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2ffbaa3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2ffbaa3f

Branch: refs/heads/master
Commit: 2ffbaa3fc5536d8e72ad6ef0144195552bc81060
Parents: 3ad5947
Author: Nirmal Fernando <[email protected]>
Authored: Wed Mar 26 15:12:46 2014 +0530
Committer: Nirmal Fernando <[email protected]>
Committed: Wed Mar 26 15:12:46 2014 +0530

----------------------------------------------------------------------
 .../receiver/health/stat/HealthStatReceiver.java  |  4 ++--
 .../topology/TopologyEventMessageDelegator.java   | 12 +++++++++++-
 .../receiver/topology/TopologyReceiver.java       | 18 +++++++++++++++++-
 3 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2ffbaa3f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
index 59f1290..0b33abc 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
@@ -50,14 +50,14 @@ public class HealthStatReceiver implements Runnable {
             Thread subscriberThread = new Thread(topicSubscriber);
             subscriberThread.start();
             if (log.isDebugEnabled()) {
-                log.debug("Health stst event message receiver thread started");
+                log.debug("Health stats event message receiver thread 
started");
             }
 
             // Start health stat event message delegator thread
             Thread receiverThread = new Thread(messageDelegator);
             receiverThread.start();
             if (log.isDebugEnabled()) {
-                log.debug("Health stst event message delegator thread 
started");
+                log.debug("Health stats event message delegator thread 
started");
             }
 
             // Keep the thread live until terminated

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2ffbaa3f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 64793bf..ce641bc 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.stratos.messaging.message.receiver.topology;
 
+import java.util.concurrent.LinkedBlockingQueue;
+
 import javax.jms.TextMessage;
 
 import org.apache.commons.logging.Log;
@@ -35,14 +37,22 @@ public class TopologyEventMessageDelegator implements 
Runnable {
 
     private static final Log log = 
LogFactory.getLog(TopologyEventMessageDelegator.class);
     private MessageProcessorChain processorChain;
+    private LinkedBlockingQueue<TextMessage> messageQueue;
     private boolean terminated;
 
     public TopologyEventMessageDelegator() {
         this.processorChain = new TopologyMessageProcessorChain();
+        this.messageQueue = TopologyEventMessageQueue.getInstance();
     }
 
     public TopologyEventMessageDelegator(MessageProcessorChain processorChain) 
{
         this.processorChain = processorChain;
+        this.messageQueue = TopologyEventMessageQueue.getInstance();
+    }
+    
+    public TopologyEventMessageDelegator(MessageProcessorChain processorChain, 
LinkedBlockingQueue<TextMessage> queue) {
+        this.processorChain = processorChain;
+        this.messageQueue = queue;
     }
 
     @Override
@@ -54,7 +64,7 @@ public class TopologyEventMessageDelegator implements 
Runnable {
 
             while (!terminated) {
                 try {
-                    TextMessage message = 
TopologyEventMessageQueue.getInstance().take();
+                    TextMessage message = messageQueue.take();
 
                     // Retrieve the header
                     String type = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2ffbaa3f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
index a9bc07e..ab02956 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
@@ -19,6 +19,8 @@
 
 package org.apache.stratos.messaging.message.receiver.topology;
 
+import javax.jms.MessageListener;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
@@ -31,23 +33,37 @@ import org.apache.stratos.messaging.util.Constants;
 public class TopologyReceiver implements Runnable {
     private static final Log log = LogFactory.getLog(TopologyReceiver.class);
     private TopologyEventMessageDelegator messageDelegator;
+    private MessageListener messageListener;
     private TopicSubscriber topicSubscriber;
     private boolean terminated;
 
     public TopologyReceiver() {
         this.messageDelegator = new TopologyEventMessageDelegator();
+        this.messageListener = new TopologyEventMessageListener();
     }
 
     public TopologyReceiver(TopologyEventMessageDelegator messageDelegator) {
         this.messageDelegator = messageDelegator;
+        this.messageListener = new TopologyEventMessageListener();
+    }
+    
+    public TopologyReceiver(MessageListener listener) {
+        this.messageDelegator = new TopologyEventMessageDelegator();
+        this.messageListener = listener;
+    }
+    
+    public TopologyReceiver(TopologyEventMessageDelegator messageDelegator, 
MessageListener listener) {
+        this.messageDelegator = messageDelegator;
+        this.messageListener = listener;
     }
+    
 
     @Override
     public void run() {
         try {
             // Start topic subscriber thread
             topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
-            topicSubscriber.setMessageListener(new 
TopologyEventMessageListener());
+            topicSubscriber.setMessageListener(messageListener);
             Thread subscriberThread = new Thread(topicSubscriber);
             subscriberThread.start();
             if (log.isDebugEnabled()) {

Reply via email to