Updated Branches: refs/heads/master 8e2cec750 -> ad1df3e11
Fixed thread termination logic in load balancer common, load balancer extension api and messaging components Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ad1df3e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ad1df3e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ad1df3e1 Branch: refs/heads/master Commit: ad1df3e11631d95efc6569e3ae0bb66352aa1564 Parents: 8e2cec7 Author: Imesh Gunaratne <[email protected]> Authored: Sat Nov 16 17:53:28 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sat Nov 16 17:53:28 2013 +0530 ---------------------------------------------------------------------- .../common/topology/TopologyReceiver.java | 44 ++++++++++++-------- .../extension/api/LoadBalancerExtension.java | 32 ++++++++++---- .../api/LoadBalancerStatsNotifier.java | 10 ++++- .../broker/heartbeat/TopicHealthChecker.java | 8 +++- .../broker/subscribe/TopicSubscriber.java | 26 ++++++++---- .../topology/TopologyEventMessageDelegator.java | 12 +++++- 6 files changed, 94 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java index 88958b5..4e91f2f 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java @@ -33,6 +33,8 @@ import org.apache.stratos.messaging.util.Constants; public class TopologyReceiver implements Runnable { private static final Log log = LogFactory.getLog(TopologyReceiver.class); private TopologyEventMessageDelegator messageDelegator; + private TopicSubscriber topicSubscriber; + private boolean terminated; public TopologyReceiver() { this.messageDelegator = new TopologyEventMessageDelegator(); @@ -45,26 +47,34 @@ public class TopologyReceiver implements Runnable { @Override public void run() { try { - // Start topic subscriber thread - TopicSubscriber topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); - topicSubscriber.setMessageListener(new TopologyEventMessageReceiver()); - Thread subscriberThread = new Thread(topicSubscriber); - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("Topology event message receiver thread started"); - } + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); + topicSubscriber.setMessageListener(new TopologyEventMessageReceiver()); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("Topology event message receiver thread started"); + } - // Start topology message receiver thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("Topology message processor thread started"); - } - } - catch (Exception e) { - if(log.isErrorEnabled()) { + // Start topology event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("Topology event message delegator thread started"); + } + + // Keep the thread live until terminated + while (!terminated); + } catch (Exception e) { + if (log.isErrorEnabled()) { log.error("Topology receiver failed", e); } } } + + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index 63df3b7..74af8f9 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -38,6 +38,9 @@ public class LoadBalancerExtension implements Runnable { private LoadBalancer loadBalancer; private LoadBalancerStatsReader statsReader; private boolean loadBalancerStarted; + private TopologyReceiver topologyReceiver; + private LoadBalancerStatsNotifier statsNotifier; + private boolean terminated; public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatsReader statsReader) { this.loadBalancer = loadBalancer; @@ -47,16 +50,22 @@ public class LoadBalancerExtension implements Runnable { @Override public void run() { try { + if(log.isInfoEnabled()) { + log.info("Load balancer extension started"); + } + // Start topology receiver thread - TopologyReceiver topologyReceiver = new TopologyReceiver(createMessageDelegator()); + topologyReceiver = new TopologyReceiver(createMessageDelegator()); Thread topologyReceiverThread = new Thread(topologyReceiver); topologyReceiverThread.start(); // Start stats notifier thread - LoadBalancerStatsNotifier statsNotifier = new LoadBalancerStatsNotifier(statsReader); + statsNotifier = new LoadBalancerStatsNotifier(statsReader); Thread statsNotifierThread = new Thread(statsNotifier); statsNotifierThread.start(); + // Keep the thread live until terminated + while (!terminated); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not start load balancer extension", e); @@ -82,11 +91,11 @@ public class LoadBalancerExtension implements Runnable { // Complete topology event is only received once // Remove event listener messageDelegator.removeCompleteTopologyEventListener(this); - } - catch (Exception e) { - if(log.isErrorEnabled()) { + } catch (Exception e) { + if (log.isErrorEnabled()) { log.error("Could not start load balancer", e); } + terminate(); } } }); @@ -130,14 +139,19 @@ public class LoadBalancerExtension implements Runnable { private void reloadConfiguration() { try { - if(loadBalancerStarted) { + if (loadBalancerStarted) { loadBalancer.reload(TopologyManager.getTopology()); } - } - catch (Exception e) { - if(log.isErrorEnabled()) { + } catch (Exception e) { + if (log.isErrorEnabled()) { log.error("Could not reload load balancer configuration", e); } } } + + public void terminate() { + topologyReceiver.terminate(); + statsNotifier.terminate(); + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java index 5beaec0..dc5a8a6 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java @@ -39,6 +39,7 @@ public class LoadBalancerStatsNotifier implements Runnable { private LoadBalancerStatsReader statsReader; private final LoadBalancerStatsPublisher statsPublisher; private long statsPublisherInterval = 15000; + private boolean terminated; public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) { this.statsReader = statsReader; @@ -52,7 +53,7 @@ public class LoadBalancerStatsNotifier implements Runnable { @Override public void run() { - while (true) { + while (!terminated) { try { try { Thread.sleep(statsPublisherInterval); @@ -74,4 +75,11 @@ public class LoadBalancerStatsNotifier implements Runnable { } } } + + /** + * Terminate load balancer statistics notifier thread. + */ + public void terminate() { + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java index 4d7638a..0327c20 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java @@ -34,8 +34,9 @@ public class TopicHealthChecker implements Runnable { private static final Log log = LogFactory.getLog(TopicHealthChecker.class); private String topicName; + private boolean terminated; - public TopicHealthChecker(String name) { + public TopicHealthChecker(String name) { topicName = name; } @@ -44,7 +45,7 @@ public class TopicHealthChecker implements Runnable { log.info("Topic Health Checker is running... "); TopicConnector testConnector = new TopicConnector(); - while (true) { + while (!terminated) { try { // health checker runs in every 30s Thread.sleep(30000); @@ -71,4 +72,7 @@ public class TopicHealthChecker implements Runnable { } + public void terminate() { + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index 577fee4..d8ec008 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -29,17 +29,20 @@ import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker; /** * Any instance who needs to subscribe to a topic, should communicate with this * object. - * + * * @author nirmal - * + * */ public class TopicSubscriber implements Runnable { private static final Log log = LogFactory.getLog(TopicSubscriber.class); - private MessageListener messageListener; + + private boolean terminated = false; + private MessageListener messageListener; private TopicSession topicSession; private String topicName; private TopicConnector connector; + private TopicHealthChecker healthChecker; private javax.jms.TopicSubscriber topicSubscriber = null; /** @@ -84,7 +87,8 @@ public class TopicSubscriber implements Runnable { @Override public void run() { - while (true) { + // Keep the thread live until terminated + while (!terminated) { try { doSubscribe(); @@ -92,11 +96,12 @@ public class TopicSubscriber implements Runnable { log.error("Error while subscribing to the topic: " + topicName, e); } finally { // start the health checker - Thread healthChecker = new Thread(new TopicHealthChecker(topicName)); - healthChecker.start(); + healthChecker = new TopicHealthChecker(topicName); + Thread healthCheckerThread = new Thread(healthChecker); + healthCheckerThread.start(); try { // waits till the thread finishes. - healthChecker.join(); + healthCheckerThread.join(); } catch (InterruptedException ignore) { } // health checker failed @@ -117,4 +122,11 @@ public class TopicSubscriber implements Runnable { } } + /** + * Terminate topic subscriber. + */ + public void terminate() { + healthChecker.terminate(); + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java index 285bd0a..408e27c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java @@ -42,6 +42,7 @@ public class TopologyEventMessageDelegator implements Runnable { private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class); private CompleteTopologyEventProcessor completeTopEvMsgProcessor; private MessageProcessorChain processorChain; + private boolean terminated; public TopologyEventMessageDelegator() { this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor(); @@ -68,7 +69,7 @@ public class TopologyEventMessageDelegator implements Runnable { log.info("Topology event message delegator started"); log.info("Waiting for the complete topology event message..."); } - while (true) { + while (!terminated) { try { // First take the complete topology event TextMessage message = TopologyEventQueue.getInstance().take(); @@ -86,7 +87,7 @@ public class TopologyEventMessageDelegator implements Runnable { } } - while (true) { + while (!terminated) { try { TextMessage message = TopologyEventQueue.getInstance().take(); @@ -119,4 +120,11 @@ public class TopologyEventMessageDelegator implements Runnable { } } } + + /** + * Terminate topology event message delegator thread. + */ + public void terminate() { + terminated = true; + } }
