Add synchronize blocks to InstanceNotifierEventReceiver to avoid concurrency 
issues


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/eb93f701
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/eb93f701
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/eb93f701

Branch: refs/heads/stratos-4.1.x
Commit: eb93f701bd1f1e1cd6e9181f8dd702b391fbc500
Parents: 2b6f972
Author: Akila Perera <[email protected]>
Authored: Mon Nov 30 00:06:41 2015 +0530
Committer: Akila Perera <[email protected]>
Committed: Mon Nov 30 00:33:46 2015 +0530

----------------------------------------------------------------------
 .../notifier/InstanceNotifierEventReceiver.java | 62 +++++++++++---------
 1 file changed, 33 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/eb93f701/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 7476d18..4ad6572 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -19,7 +19,6 @@
 
 package org.apache.stratos.messaging.message.receiver.instance.notifier;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
@@ -32,50 +31,54 @@ import org.apache.stratos.messaging.util.MessagingUtil;
 public class InstanceNotifierEventReceiver {
     private static final Log log = 
LogFactory.getLog(InstanceNotifierEventReceiver.class);
     private final InstanceNotifierEventMessageDelegator messageDelegator;
-    private final InstanceNotifierEventMessageListener messageListener;
     private EventSubscriber eventSubscriber;
     private boolean terminated;
 
     public InstanceNotifierEventReceiver() {
         InstanceNotifierEventMessageQueue messageQueue = new 
InstanceNotifierEventMessageQueue();
         this.messageDelegator = new 
InstanceNotifierEventMessageDelegator(messageQueue);
-        this.messageListener = new 
InstanceNotifierEventMessageListener(messageQueue);
+        InstanceNotifierEventMessageListener messageListener = new 
InstanceNotifierEventMessageListener(messageQueue);
+        // Start topic subscriber thread
+        eventSubscriber = new 
EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
+                messageListener);
     }
 
     public void addEventListener(EventListener eventListener) {
         messageDelegator.addEventListener(eventListener);
     }
 
-
     public void execute() {
-        try {
-            // Start topic subscriber thread
-            eventSubscriber = new 
EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(), 
messageListener);
-//                     subscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(eventSubscriber);
-
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("InstanceNotifier event message receiver thread 
started");
-            }
-
-            // Start instance notifier event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("InstanceNotifier event message delegator thread 
started");
+        synchronized (this) {
+            if (terminated) {
+                log.info("InstanceNotifierEventReceiver has been terminated. 
Event subscriber will not be created.");
+                return;
             }
+            try {
+                Thread subscriberThread = new Thread(eventSubscriber);
+                subscriberThread.start();
+                if (log.isDebugEnabled()) {
+                    log.debug("InstanceNotifier event message receiver thread 
started");
+                }
 
-            // Keep the thread live until terminated
-            while (!terminated) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
+                // Start instance notifier event message delegator thread
+                Thread receiverThread = new Thread(messageDelegator);
+                receiverThread.start();
+                if (log.isDebugEnabled()) {
+                    log.debug("InstanceNotifier event message delegator thread 
started");
+                }
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error("InstanceNotifier receiver failed", e);
                 }
             }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("InstanceNotifier receiver failed", e);
+        }
+        log.info("InstanceNotifierEventReceiver started");
+
+        // Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(2000);
+            } catch (InterruptedException ignore) {
             }
         }
     }
@@ -84,9 +87,10 @@ public class InstanceNotifierEventReceiver {
         return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
     }
 
-    public void terminate() {
+    public synchronized void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
         terminated = true;
+        log.info("InstanceNotifierEventReceiver terminated");
     }
 }

Reply via email to