Updated Branches:
  refs/heads/master 8e2cec750 -> ad1df3e11

Fixed thread termination logic in load balancer common, load balancer extension 
api and messaging components


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ad1df3e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ad1df3e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ad1df3e1

Branch: refs/heads/master
Commit: ad1df3e11631d95efc6569e3ae0bb66352aa1564
Parents: 8e2cec7
Author: Imesh Gunaratne <[email protected]>
Authored: Sat Nov 16 17:53:28 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Sat Nov 16 17:53:28 2013 +0530

----------------------------------------------------------------------
 .../common/topology/TopologyReceiver.java       | 44 ++++++++++++--------
 .../extension/api/LoadBalancerExtension.java    | 32 ++++++++++----
 .../api/LoadBalancerStatsNotifier.java          | 10 ++++-
 .../broker/heartbeat/TopicHealthChecker.java    |  8 +++-
 .../broker/subscribe/TopicSubscriber.java       | 26 ++++++++----
 .../topology/TopologyEventMessageDelegator.java | 12 +++++-
 6 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
index 88958b5..4e91f2f 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
@@ -33,6 +33,8 @@ import org.apache.stratos.messaging.util.Constants;
 public class TopologyReceiver implements Runnable {
     private static final Log log = LogFactory.getLog(TopologyReceiver.class);
     private TopologyEventMessageDelegator messageDelegator;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
 
     public TopologyReceiver() {
         this.messageDelegator = new TopologyEventMessageDelegator();
@@ -45,26 +47,34 @@ public class TopologyReceiver implements Runnable {
     @Override
     public void run() {
         try {
-                // Start topic subscriber thread
-                TopicSubscriber topicSubscriber = new 
TopicSubscriber(Constants.TOPOLOGY_TOPIC);
-                topicSubscriber.setMessageListener(new 
TopologyEventMessageReceiver());
-                Thread subscriberThread = new Thread(topicSubscriber);
-                subscriberThread.start();
-                if (log.isDebugEnabled()) {
-                    log.debug("Topology event message receiver thread 
started");
-                }
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+            topicSubscriber.setMessageListener(new 
TopologyEventMessageReceiver());
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message receiver thread started");
+            }
 
-                // Start topology message receiver thread
-                Thread receiverThread = new Thread(messageDelegator);
-                receiverThread.start();
-                if (log.isDebugEnabled()) {
-                    log.debug("Topology message processor thread started");
-                }
-        }
-        catch (Exception e) {
-            if(log.isErrorEnabled()) {
+            // Start topology event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
                 log.error("Topology receiver failed", e);
             }
         }
     }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 63df3b7..74af8f9 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -38,6 +38,9 @@ public class LoadBalancerExtension implements Runnable {
     private LoadBalancer loadBalancer;
     private LoadBalancerStatsReader statsReader;
     private boolean loadBalancerStarted;
+    private TopologyReceiver topologyReceiver;
+    private LoadBalancerStatsNotifier statsNotifier;
+    private boolean terminated;
 
     public LoadBalancerExtension(LoadBalancer loadBalancer, 
LoadBalancerStatsReader statsReader) {
         this.loadBalancer = loadBalancer;
@@ -47,16 +50,22 @@ public class LoadBalancerExtension implements Runnable {
     @Override
     public void run() {
         try {
+            if(log.isInfoEnabled()) {
+                log.info("Load balancer extension started");
+            }
+
             // Start topology receiver thread
-            TopologyReceiver topologyReceiver = new 
TopologyReceiver(createMessageDelegator());
+            topologyReceiver = new TopologyReceiver(createMessageDelegator());
             Thread topologyReceiverThread = new Thread(topologyReceiver);
             topologyReceiverThread.start();
 
             // Start stats notifier thread
-            LoadBalancerStatsNotifier statsNotifier = new 
LoadBalancerStatsNotifier(statsReader);
+            statsNotifier = new LoadBalancerStatsNotifier(statsReader);
             Thread statsNotifierThread = new Thread(statsNotifier);
             statsNotifierThread.start();
 
+            // Keep the thread live until terminated
+            while (!terminated);
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Could not start load balancer extension", e);
@@ -82,11 +91,11 @@ public class LoadBalancerExtension implements Runnable {
                     // Complete topology event is only received once
                     // Remove event listener
                     messageDelegator.removeCompleteTopologyEventListener(this);
-                }
-                catch (Exception e) {
-                    if(log.isErrorEnabled()) {
+                } catch (Exception e) {
+                    if (log.isErrorEnabled()) {
                         log.error("Could not start load balancer", e);
                     }
+                    terminate();
                 }
             }
         });
@@ -130,14 +139,19 @@ public class LoadBalancerExtension implements Runnable {
 
     private void reloadConfiguration() {
         try {
-            if(loadBalancerStarted) {
+            if (loadBalancerStarted) {
                 loadBalancer.reload(TopologyManager.getTopology());
             }
-        }
-        catch (Exception e) {
-            if(log.isErrorEnabled()) {
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
                 log.error("Could not reload load balancer configuration", e);
             }
         }
     }
+
+    public void terminate() {
+        topologyReceiver.terminate();
+        statsNotifier.terminate();
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
index 5beaec0..dc5a8a6 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
@@ -39,6 +39,7 @@ public class LoadBalancerStatsNotifier implements Runnable {
     private LoadBalancerStatsReader statsReader;
     private final LoadBalancerStatsPublisher statsPublisher;
     private long statsPublisherInterval = 15000;
+    private boolean terminated;
 
     public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) {
         this.statsReader = statsReader;
@@ -52,7 +53,7 @@ public class LoadBalancerStatsNotifier implements Runnable {
 
     @Override
     public void run() {
-        while (true) {
+        while (!terminated) {
             try {
                 try {
                     Thread.sleep(statsPublisherInterval);
@@ -74,4 +75,11 @@ public class LoadBalancerStatsNotifier implements Runnable {
             }
         }
     }
+
+    /**
+     * Terminate load balancer statistics notifier thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 4d7638a..0327c20 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -34,8 +34,9 @@ public class TopicHealthChecker implements Runnable {
 
        private static final Log log = 
LogFactory.getLog(TopicHealthChecker.class);
        private String topicName;
+    private boolean terminated;
 
-       public TopicHealthChecker(String name) {
+    public TopicHealthChecker(String name) {
                topicName = name;
        }
 
@@ -44,7 +45,7 @@ public class TopicHealthChecker implements Runnable {
                log.info("Topic Health Checker is running... ");
 
                TopicConnector testConnector = new TopicConnector();
-               while (true) {
+               while (!terminated) {
                        try {
                                // health checker runs in every 30s
                                Thread.sleep(30000);
@@ -71,4 +72,7 @@ public class TopicHealthChecker implements Runnable {
 
        }
 
+    public void terminate() {
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index 577fee4..d8ec008 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -29,17 +29,20 @@ import 
org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker;
 /**
  * Any instance who needs to subscribe to a topic, should communicate with this
  * object.
- * 
+ *
  * @author nirmal
- * 
+ *
  */
 public class TopicSubscriber implements Runnable {
 
        private static final Log log = LogFactory.getLog(TopicSubscriber.class);
-       private MessageListener messageListener;
+
+    private boolean terminated = false;
+    private MessageListener messageListener;
        private TopicSession topicSession;
        private String topicName;
        private TopicConnector connector;
+    private TopicHealthChecker healthChecker;
        private javax.jms.TopicSubscriber topicSubscriber = null;
 
        /**
@@ -84,7 +87,8 @@ public class TopicSubscriber implements Runnable {
        @Override
        public void run() {
 
-               while (true) {
+        // Keep the thread live until terminated
+               while (!terminated) {
                        try {
                                doSubscribe();
 
@@ -92,11 +96,12 @@ public class TopicSubscriber implements Runnable {
                                log.error("Error while subscribing to the 
topic: " + topicName, e);
                        } finally {
                                // start the health checker
-                               Thread healthChecker = new Thread(new 
TopicHealthChecker(topicName));
-                               healthChecker.start();
+                healthChecker = new TopicHealthChecker(topicName);
+                           Thread healthCheckerThread = new 
Thread(healthChecker);
+                               healthCheckerThread.start();
                                try {
                                        // waits till the thread finishes.
-                                       healthChecker.join();
+                                       healthCheckerThread.join();
                                } catch (InterruptedException ignore) {
                                }
                                // health checker failed
@@ -117,4 +122,11 @@ public class TopicSubscriber implements Runnable {
                }
        }
 
+    /**
+     * Terminate topic subscriber.
+     */
+    public void terminate() {
+        healthChecker.terminate();
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 285bd0a..408e27c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -42,6 +42,7 @@ public class TopologyEventMessageDelegator implements 
Runnable {
     private static final Log log = 
LogFactory.getLog(TopologyEventMessageDelegator.class);
     private CompleteTopologyEventProcessor completeTopEvMsgProcessor;
     private MessageProcessorChain processorChain;
+    private boolean terminated;
 
     public TopologyEventMessageDelegator() {
         this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
@@ -68,7 +69,7 @@ public class TopologyEventMessageDelegator implements 
Runnable {
                 log.info("Topology event message delegator started");
                 log.info("Waiting for the complete topology event message...");
             }
-            while (true) {
+            while (!terminated) {
                 try {
                     // First take the complete topology event
                     TextMessage message = 
TopologyEventQueue.getInstance().take();
@@ -86,7 +87,7 @@ public class TopologyEventMessageDelegator implements 
Runnable {
                 }
             }
 
-            while (true) {
+            while (!terminated) {
                 try {
                     TextMessage message = 
TopologyEventQueue.getInstance().take();
 
@@ -119,4 +120,11 @@ public class TopologyEventMessageDelegator implements 
Runnable {
             }
         }
     }
+
+    /**
+     * Terminate topology event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
 }

Reply via email to