Repository: incubator-stratos
Updated Branches:
  refs/heads/master a04e657e1 -> bccad5be0


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
index 59e9ad1..cafdf74 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
@@ -31,10 +31,16 @@ import javax.jms.TextMessage;
  * Implements functionality for receiving text based event messages from the 
tenant
  * message broker topic and add them to the event queue.
  */
-public class TenantEventMessageListener implements MessageListener {
+class TenantEventMessageListener implements MessageListener {
 
     private static final Log log = 
LogFactory.getLog(TenantEventMessageListener.class);
 
+    private TenantEventMessageQueue messageQueue;
+
+    public TenantEventMessageListener(TenantEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
     @Override
     public void onMessage(Message message) {
         if (message instanceof TextMessage) {
@@ -44,7 +50,7 @@ public class TenantEventMessageListener implements 
MessageListener {
                     log.debug(String.format("Tenant message received: %s", 
((TextMessage) message).getText()));
                 }
                 // Add received message to the queue
-                TenantEventMessageQueue.getInstance().add(receivedMessage);
+                messageQueue.add(receivedMessage);
 
             } catch (JMSException e) {
                 log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
index 423f169..d6f0217 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
@@ -25,20 +25,5 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * Implements a blocking queue for managing tenant event messages.
  */
-public class TenantEventMessageQueue extends LinkedBlockingQueue<TextMessage>{
-    private static volatile TenantEventMessageQueue instance;
-
-    private TenantEventMessageQueue(){
-    }
-
-    public static TenantEventMessageQueue getInstance() {
-        if (instance == null) {
-            synchronized (TenantEventMessageQueue.class){
-                if (instance == null) {
-                    instance = new TenantEventMessageQueue();
-                }
-            }
-        }
-        return instance;
-    }
+class TenantEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
new file mode 100644
index 0000000..059f95f
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving tenant information from message broker and
+ * build tenant information in tenant manager.
+ */
+public class TenantEventReceiver implements Runnable {
+    private static final Log log = 
LogFactory.getLog(TenantEventReceiver.class);
+    private TenantEventMessageDelegator messageDelegator;
+    private TenantEventMessageListener messageListener;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public TenantEventReceiver() {
+        TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
+        this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
+        this.messageListener = new TenantEventMessageListener(messageQueue);
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        messageDelegator.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.TENANT_TOPIC);
+            topicSubscriber.setMessageListener(messageListener);
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Tenant event message receiver thread started");
+            }
+
+            // Start tenant event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Tenant event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated) {
+               try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Tenant receiver failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java
deleted file mode 100644
index a6f2141..0000000
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.tenant;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
-
-/**
- * A thread for receiving tenant information from message broker and
- * build tenant information in tenant manager.
- */
-public class TenantReceiver implements Runnable {
-    private static final Log log = LogFactory.getLog(TenantReceiver.class);
-    private TenantEventMessageDelegator messageDelegator;
-    private TopicSubscriber topicSubscriber;
-    private boolean terminated;
-
-    public TenantReceiver() {
-        this.messageDelegator = new TenantEventMessageDelegator();
-    }
-
-    public TenantReceiver(TenantEventMessageDelegator messageDelegator) {
-        this.messageDelegator = messageDelegator;
-    }
-
-    @Override
-    public void run() {
-        try {
-            // Start topic subscriber thread
-            topicSubscriber = new TopicSubscriber(Constants.TENANT_TOPIC);
-            topicSubscriber.setMessageListener(new 
TenantEventMessageListener());
-            Thread subscriberThread = new Thread(topicSubscriber);
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Tenant event message receiver thread started");
-            }
-
-            // Start tenant event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Tenant event message delegator thread started");
-            }
-
-            // Keep the thread live until terminated
-            while (!terminated) {
-               try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Tenant receiver failed", e);
-            }
-        }
-    }
-
-    public void terminate() {
-        topicSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index a06bbe3..9cc8f78 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -18,41 +18,34 @@
  */
 package org.apache.stratos.messaging.message.receiver.topology;
 
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.TextMessage;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-import org.apache.stratos.messaging.message.processor.topology.*;
+import 
org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
 import org.apache.stratos.messaging.util.Constants;
 
+import javax.jms.TextMessage;
+
 
 /**
  * Implements logic for processing topology event messages based on a given
  * topology process chain.
  */
-public class TopologyEventMessageDelegator implements Runnable {
+class TopologyEventMessageDelegator implements Runnable {
 
     private static final Log log = 
LogFactory.getLog(TopologyEventMessageDelegator.class);
     private MessageProcessorChain processorChain;
-    private LinkedBlockingQueue<TextMessage> messageQueue;
+    private TopologyEventMessageQueue messageQueue;
     private boolean terminated;
 
-    public TopologyEventMessageDelegator() {
+    public TopologyEventMessageDelegator(TopologyEventMessageQueue 
messageQueue) {
+        this.messageQueue = messageQueue;
         this.processorChain = new TopologyMessageProcessorChain();
-        this.messageQueue = TopologyEventMessageQueue.getInstance();
     }
 
-    public TopologyEventMessageDelegator(MessageProcessorChain processorChain) 
{
-        this.processorChain = processorChain;
-        this.messageQueue = TopologyEventMessageQueue.getInstance();
-    }
-    
-    public TopologyEventMessageDelegator(MessageProcessorChain processorChain, 
LinkedBlockingQueue<TextMessage> queue) {
-        this.processorChain = processorChain;
-        this.messageQueue = queue;
+    public void addEventListener(EventListener eventListener) {
+        processorChain.addEventListener(eventListener);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
index 03afe13..799b1b1 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
@@ -18,22 +18,27 @@
  */
 package org.apache.stratos.messaging.message.receiver.topology;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * Implements functionality for receiving text based event messages from the 
topology
  * message broker topic and add them to the event queue.
  */
-public class TopologyEventMessageListener implements MessageListener {
-
+class TopologyEventMessageListener implements MessageListener {
     private static final Log log = 
LogFactory.getLog(TopologyEventMessageListener.class);
 
+    private TopologyEventMessageQueue messageQueue;
+
+    public TopologyEventMessageListener(TopologyEventMessageQueue 
messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
     @Override
     public void onMessage(Message message) {
         if (message instanceof TextMessage) {
@@ -43,7 +48,7 @@ public class TopologyEventMessageListener implements 
MessageListener {
                     log.debug(String.format("Topology message received: %s", 
((TextMessage) message).getText()));
                 }
                 // Add received message to the queue
-                TopologyEventMessageQueue.getInstance().add(receivedMessage);
+                messageQueue.add(receivedMessage);
 
             } catch (JMSException e) {
                 log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
index db33289..8ebbc98 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
@@ -19,27 +19,11 @@
 
 package org.apache.stratos.messaging.message.receiver.topology;
 
-import java.util.concurrent.LinkedBlockingQueue;
-
 import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Implements a blocking queue for managing topology event messages.
  */
-public class TopologyEventMessageQueue extends 
LinkedBlockingQueue<TextMessage>{
-    private static volatile TopologyEventMessageQueue instance;
-
-    private TopologyEventMessageQueue(){
-    }
-
-    public static TopologyEventMessageQueue getInstance() {
-        if (instance == null) {
-            synchronized (TopologyEventMessageQueue.class){
-                if (instance == null) {
-                    instance = new TopologyEventMessageQueue();
-                }
-            }
-        }
-        return instance;
-    }
+class TopologyEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
new file mode 100644
index 0000000..b271beb
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving topology information from message broker and
+ * build topology in topology manager.
+ */
+public class TopologyEventReceiver implements Runnable {
+    private static final Log log = 
LogFactory.getLog(TopologyEventReceiver.class);
+    private TopologyEventMessageDelegator messageDelegator;
+    private TopologyEventMessageListener messageListener;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public TopologyEventReceiver() {
+        TopologyEventMessageQueue messageQueue = new 
TopologyEventMessageQueue();
+        this.messageDelegator = new 
TopologyEventMessageDelegator(messageQueue);
+        this.messageListener = new TopologyEventMessageListener(messageQueue);
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        messageDelegator.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+            topicSubscriber.setMessageListener(messageListener);
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message receiver thread started");
+            }
+
+            // Start topology event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated) {
+               try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Topology receiver failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
deleted file mode 100644
index ab02956..0000000
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.topology;
-
-import javax.jms.MessageListener;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
-
-/**
- * A thread for receiving topology information from message broker and
- * build topology in topology manager.
- */
-public class TopologyReceiver implements Runnable {
-    private static final Log log = LogFactory.getLog(TopologyReceiver.class);
-    private TopologyEventMessageDelegator messageDelegator;
-    private MessageListener messageListener;
-    private TopicSubscriber topicSubscriber;
-    private boolean terminated;
-
-    public TopologyReceiver() {
-        this.messageDelegator = new TopologyEventMessageDelegator();
-        this.messageListener = new TopologyEventMessageListener();
-    }
-
-    public TopologyReceiver(TopologyEventMessageDelegator messageDelegator) {
-        this.messageDelegator = messageDelegator;
-        this.messageListener = new TopologyEventMessageListener();
-    }
-    
-    public TopologyReceiver(MessageListener listener) {
-        this.messageDelegator = new TopologyEventMessageDelegator();
-        this.messageListener = listener;
-    }
-    
-    public TopologyReceiver(TopologyEventMessageDelegator messageDelegator, 
MessageListener listener) {
-        this.messageDelegator = messageDelegator;
-        this.messageListener = listener;
-    }
-    
-
-    @Override
-    public void run() {
-        try {
-            // Start topic subscriber thread
-            topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
-            topicSubscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(topicSubscriber);
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Topology event message receiver thread started");
-            }
-
-            // Start topology event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Topology event message delegator thread started");
-            }
-
-            // Keep the thread live until terminated
-            while (!terminated) {
-               try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Topology receiver failed", e);
-            }
-        }
-    }
-
-    public void terminate() {
-        topicSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
-}

Reply via email to