Add synchronize blocks to InstanceNotifierEventReceiver to avoid concurrency issues
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/eb93f701 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/eb93f701 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/eb93f701 Branch: refs/heads/stratos-4.1.x Commit: eb93f701bd1f1e1cd6e9181f8dd702b391fbc500 Parents: 2b6f972 Author: Akila Perera <[email protected]> Authored: Mon Nov 30 00:06:41 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Mon Nov 30 00:33:46 2015 +0530 ---------------------------------------------------------------------- .../notifier/InstanceNotifierEventReceiver.java | 62 +++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/eb93f701/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index 7476d18..4ad6572 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -19,7 +19,6 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; @@ -32,50 +31,54 @@ import org.apache.stratos.messaging.util.MessagingUtil; public class InstanceNotifierEventReceiver { private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class); private final InstanceNotifierEventMessageDelegator messageDelegator; - private final InstanceNotifierEventMessageListener messageListener; private EventSubscriber eventSubscriber; private boolean terminated; public InstanceNotifierEventReceiver() { InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); - this.messageListener = new InstanceNotifierEventMessageListener(messageQueue); + InstanceNotifierEventMessageListener messageListener = new InstanceNotifierEventMessageListener(messageQueue); + // Start topic subscriber thread + eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(), + messageListener); } public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } - public void execute() { - try { - // Start topic subscriber thread - eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(), messageListener); -// subscriber.setMessageListener(messageListener); - Thread subscriberThread = new Thread(eventSubscriber); - - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("InstanceNotifier event message receiver thread started"); - } - - // Start instance notifier event message delegator thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("InstanceNotifier event message delegator thread started"); + synchronized (this) { + if (terminated) { + log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created."); + return; } + try { + Thread subscriberThread = new Thread(eventSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("InstanceNotifier event message receiver thread started"); + } - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { + // Start instance notifier event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("InstanceNotifier event message delegator thread started"); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("InstanceNotifier receiver failed", e); } } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("InstanceNotifier receiver failed", e); + } + log.info("InstanceNotifierEventReceiver started"); + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(2000); + } catch (InterruptedException ignore) { } } } @@ -84,9 +87,10 @@ public class InstanceNotifierEventReceiver { return ((eventSubscriber != null) && (eventSubscriber.isSubscribed())); } - public void terminate() { + public synchronized void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); terminated = true; + log.info("InstanceNotifierEventReceiver terminated"); } }
