fixing a deadlock scenario - in the single JVM case- each message delegator should have its own message queue and also each message receiver should have its own message listener
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2ffbaa3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2ffbaa3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2ffbaa3f Branch: refs/heads/master Commit: 2ffbaa3fc5536d8e72ad6ef0144195552bc81060 Parents: 3ad5947 Author: Nirmal Fernando <[email protected]> Authored: Wed Mar 26 15:12:46 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Wed Mar 26 15:12:46 2014 +0530 ---------------------------------------------------------------------- .../receiver/health/stat/HealthStatReceiver.java | 4 ++-- .../topology/TopologyEventMessageDelegator.java | 12 +++++++++++- .../receiver/topology/TopologyReceiver.java | 18 +++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2ffbaa3f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java index 59f1290..0b33abc 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java @@ -50,14 +50,14 @@ public class HealthStatReceiver implements Runnable { Thread subscriberThread = new Thread(topicSubscriber); subscriberThread.start(); if (log.isDebugEnabled()) { - log.debug("Health stst event message receiver thread started"); + log.debug("Health stats event message receiver thread started"); } // Start health stat event message delegator thread Thread receiverThread = new Thread(messageDelegator); receiverThread.start(); if (log.isDebugEnabled()) { - log.debug("Health stst event message delegator thread started"); + log.debug("Health stats event message delegator thread started"); } // Keep the thread live until terminated http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2ffbaa3f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java index 64793bf..ce641bc 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java @@ -18,6 +18,8 @@ */ package org.apache.stratos.messaging.message.receiver.topology; +import java.util.concurrent.LinkedBlockingQueue; + import javax.jms.TextMessage; import org.apache.commons.logging.Log; @@ -35,14 +37,22 @@ public class TopologyEventMessageDelegator implements Runnable { private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class); private MessageProcessorChain processorChain; + private LinkedBlockingQueue<TextMessage> messageQueue; private boolean terminated; public TopologyEventMessageDelegator() { this.processorChain = new TopologyMessageProcessorChain(); + this.messageQueue = TopologyEventMessageQueue.getInstance(); } public TopologyEventMessageDelegator(MessageProcessorChain processorChain) { this.processorChain = processorChain; + this.messageQueue = TopologyEventMessageQueue.getInstance(); + } + + public TopologyEventMessageDelegator(MessageProcessorChain processorChain, LinkedBlockingQueue<TextMessage> queue) { + this.processorChain = processorChain; + this.messageQueue = queue; } @Override @@ -54,7 +64,7 @@ public class TopologyEventMessageDelegator implements Runnable { while (!terminated) { try { - TextMessage message = TopologyEventMessageQueue.getInstance().take(); + TextMessage message = messageQueue.take(); // Retrieve the header String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2ffbaa3f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java index a9bc07e..ab02956 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java @@ -19,6 +19,8 @@ package org.apache.stratos.messaging.message.receiver.topology; +import javax.jms.MessageListener; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; @@ -31,23 +33,37 @@ import org.apache.stratos.messaging.util.Constants; public class TopologyReceiver implements Runnable { private static final Log log = LogFactory.getLog(TopologyReceiver.class); private TopologyEventMessageDelegator messageDelegator; + private MessageListener messageListener; private TopicSubscriber topicSubscriber; private boolean terminated; public TopologyReceiver() { this.messageDelegator = new TopologyEventMessageDelegator(); + this.messageListener = new TopologyEventMessageListener(); } public TopologyReceiver(TopologyEventMessageDelegator messageDelegator) { this.messageDelegator = messageDelegator; + this.messageListener = new TopologyEventMessageListener(); + } + + public TopologyReceiver(MessageListener listener) { + this.messageDelegator = new TopologyEventMessageDelegator(); + this.messageListener = listener; + } + + public TopologyReceiver(TopologyEventMessageDelegator messageDelegator, MessageListener listener) { + this.messageDelegator = messageDelegator; + this.messageListener = listener; } + @Override public void run() { try { // Start topic subscriber thread topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); - topicSubscriber.setMessageListener(new TopologyEventMessageListener()); + topicSubscriber.setMessageListener(messageListener); Thread subscriberThread = new Thread(topicSubscriber); subscriberThread.start(); if (log.isDebugEnabled()) {
