http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index c0aee87..8e8d974 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -1,104 +1,105 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 package org.apache.stratos.messaging.broker.subscribe;
 
-import javax.jms.*;
+import javax.jms.JMSException;
+import javax.jms.TopicSession;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.connect.TopicConnector;
+import org.apache.stratos.messaging.broker.connect.MQTTConnector;
 import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import 
org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
 import org.apache.stratos.messaging.util.Util;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
 
 /**
  * Any instance who needs to subscribe to a topic, should communicate with this
  * object.
- *
+ * 
  * @author nirmal
- *
+ * 
  */
 public class TopicSubscriber implements Runnable {
 
        private static final Log log = LogFactory.getLog(TopicSubscriber.class);
 
-    private boolean terminated = false;
-    private MessageListener messageListener;
+       private boolean terminated = false;
+       private MqttCallback messageListener;
        private TopicSession topicSession;
-       private String topicName;
-       private TopicConnector connector;
-    private TopicHealthChecker healthChecker;
-       private javax.jms.TopicSubscriber topicSubscriber = null;
-    private boolean subscribed;
+       private final String topicName;
 
-    /**
+       private TopicHealthChecker healthChecker;
+       private final javax.jms.TopicSubscriber topicSubscriber = null;
+       private boolean subscribed;
+       private final MessageProcessorChain processorChain;
+
+       /**
         * @param aTopicName
         *            topic name of this subscriber instance.
         */
        public TopicSubscriber(String aTopicName) {
                topicName = aTopicName;
-               connector = new TopicConnector();
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Topic subscriber connector created: 
[topic] %s", topicName));
-        }
+
+               if (log.isDebugEnabled()) {
+                       log.debug(String.format("Topic subscriber connector 
created: [topic] %s", topicName));
+               }
+               this.processorChain = new 
InstanceNotifierMessageProcessorChain();
        }
 
        private void doSubscribe() throws Exception, JMSException {
-               // Initialize a topic connection
-               connector.init(topicName);
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Topic subscriber connector initialized: 
[topic] %s", topicName));
-        }
-        // Create new session
-        topicSession = createSession(connector);
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Topic subscriber session created: [topic] 
%s", topicName));
-        }
-        // Create a new subscriber
-        createSubscriber(topicSession);
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Topic subscriber created: [topic] %s", 
topicName));
-        }
-        subscribed = true;
-       }
 
-    private void createSubscriber(TopicSession topicSession) throws 
JMSException {
-        Topic topic = connector.getTopic();
-        if (topic == null) {
-            // if topic doesn't exist, create it.
-            topic = topicSession.createTopic(topicName);
-        }
-        topicSubscriber = topicSession.createSubscriber(topic);
-        topicSubscriber.setMessageListener(messageListener);
-    }
-
-    private TopicSession createSession(TopicConnector topicConnector) throws 
Exception {
-        // Create a new session
-        return topicConnector.newSession();
-    }
+               MqttClient mqttClient = 
MQTTConnector.getMQTTSubClient(Util.getRandomString(5));
+               try {
+
+                       mqttClient.connect();
+                       if (log.isDebugEnabled()) {
+                               log.debug("Subscribing to topic '" + topicName 
+ "' from " +
+                                         mqttClient.getServerURI());
+                       }
+                       // Subscribing to specific topic
+
+                       mqttClient.subscribe(topicName);
+
+                       // Continue waiting for messages until the Enter is 
pressed
+                       mqttClient.setCallback(messageListener);
+                       while (true) {
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException e) {
+                               }
+                       }
+
+               } finally {
+                       mqttClient.disconnect();
+               }
+       }
 
        /**
         * @param messageListener
         *            this MessageListener will get triggered each time this
         *            subscription receives a message.
         */
-       public void setMessageListener(MessageListener messageListener) {
+       public void setMessageListener(MqttCallback messageListener) {
 
                this.messageListener = messageListener;
        }
@@ -111,70 +112,67 @@ public class TopicSubscriber implements Runnable {
        @Override
        public void run() {
 
-        // Keep the thread live until terminated
+               // Keep the thread live until terminated
                while (!terminated) {
                        try {
                                doSubscribe();
                        } catch (Exception e) {
-                subscribed = false;
+                               subscribed = false;
                                log.error("Error while subscribing to the 
topic: " + topicName, e);
                        } finally {
-                if(subscribed) {
-                    // start the health checker if subscribed
-                    healthChecker = new TopicHealthChecker(topicName);
-                    Thread healthCheckerThread = new Thread(healthChecker);
-                    healthCheckerThread.start();
-                    try {
-                        // waits till the thread finishes.
-                        healthCheckerThread.join();
-                    } catch (InterruptedException ignore) {
-                    }
-                }
-                else {
-                                   // subscription failed
-                    if(log.isInfoEnabled()) {
-                        log.info("Will try to subscribe again in 
"+Util.getFailoverPingInterval()/1000+" sec");
-                    }
-                    try {
-                        Thread.sleep(Util.getFailoverPingInterval());
-                    } catch (InterruptedException ignore) {
-                    }
-                }
-                // closes all sessions/connections
-                try {
-                    if (topicSubscriber != null) {
-                        topicSubscriber.close();
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Topic subscriber closed: 
[topic] %s", topicName));
-                        }
-                    }
-                    if (topicSession != null) {
-                        topicSession.close();
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Topic subscriber session 
closed: [topic] %s", topicName));
-                        }
-                    }
-                    if (connector != null) {
-                        connector.close();
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Topic subscriber 
connector closed: [topic] %s", topicName));
-                        }
-                    }
-                } catch (JMSException ignore) {
-                }
+                               if (subscribed) {
+                                       // start the health checker if 
subscribed
+                                       healthChecker = new 
TopicHealthChecker(topicName);
+                                       Thread healthCheckerThread = new 
Thread(healthChecker);
+                                       healthCheckerThread.start();
+                                       try {
+                                               // waits till the thread 
finishes.
+                                               healthCheckerThread.join();
+                                       } catch (InterruptedException ignore) {
+                                       }
+                               } else {
+                                       // subscription failed
+                                       if (log.isInfoEnabled()) {
+                                               log.info("Will try to subscribe 
again in " +
+                                                        
Util.getFailoverPingInterval() / 1000 + " sec");
+                                       }
+                                       try {
+                                               
Thread.sleep(Util.getFailoverPingInterval());
+                                       } catch (InterruptedException ignore) {
+                                       }
+                               }
+                               // closes all sessions/connections
+                               try {
+                                       if (topicSubscriber != null) {
+                                               topicSubscriber.close();
+                                               if (log.isDebugEnabled()) {
+                                                       
log.debug(String.format("Topic subscriber closed: [topic] %s",
+                                                                               
topicName));
+                                               }
+                                       }
+                                       if (topicSession != null) {
+                                               topicSession.close();
+                                               if (log.isDebugEnabled()) {
+                                                       
log.debug(String.format("Topic subscriber session closed: [topic] %s",
+                                                                               
topicName));
+                                               }
+                                       }
+
+                               } catch (JMSException ignore) {
+                               }
                        }
                }
        }
 
-    /**
-     * Terminate topic subscriber.
-     */
-    public void terminate() {
-        healthChecker.terminate();
-        terminated = true;
-    }
-
-    public boolean isSubscribed() {
-        return subscribed;
-    }
+       /**
+        * Terminate topic subscriber.
+        */
+       public void terminate() {
+               healthChecker.terminate();
+               terminated = true;
+       }
+
+       public boolean isSubscribed() {
+               return subscribed;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
index 5e818bc..bfaf622 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -1,60 +1,88 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 package org.apache.stratos.messaging.message.receiver.health.stat;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
 /**
- * Implements functionality for receiving text based event messages from the 
health stat
+ * Implements functionality for receiving text based event messages from the
+ * health stat
  * message broker topic and add them to the event queue.
  */
-public class HealthStatEventMessageListener implements MessageListener {
-
-    private static final Log log = 
LogFactory.getLog(HealthStatEventMessageListener.class);
-
-    private HealthStatEventMessageQueue messageQueue;
-
-    public HealthStatEventMessageListener(HealthStatEventMessageQueue 
messageQueue) {
-        this.messageQueue = messageQueue;
-    }
-
-    @Override
-    public void onMessage(Message message) {
-        if (message instanceof TextMessage) {
-            TextMessage receivedMessage = (TextMessage) message;
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Tenant message received: %s", 
((TextMessage) message).getText()));
-                }
-                // Add received message to the queue
-                messageQueue.add(receivedMessage);
-
-            } catch (JMSException e) {
-                log.error(e.getMessage(), e);
-            }
-        }
-    }
+public class HealthStatEventMessageListener implements MqttCallback {
+
+       private static final Log log = 
LogFactory.getLog(HealthStatEventMessageListener.class);
+
+       private final HealthStatEventMessageQueue messageQueue;
+
+       public HealthStatEventMessageListener(HealthStatEventMessageQueue 
messageQueue) {
+               this.messageQueue = messageQueue;
+       }
+
+       @Override
+       public void connectionLost(Throwable arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void deliveryComplete(IMqttDeliveryToken arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void messageArrived(String topicName, MqttMessage message) 
throws Exception {
+               if (message instanceof MqttMessage) {
+
+                       TextMessage receivedMessage = new ActiveMQTextMessage();
+                       if (log.isDebugEnabled()) {
+                               log.debug(String.format("Health stat event 
messege received...."));
+
+                       }
+                       receivedMessage.setText(new 
String(message.getPayload()));
+                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                                                         
Util.getEventNameForTopic(topicName));
+
+                       try {
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format("Health stat 
event message received: %s",
+                                                               ((TextMessage) 
message).getText()));
+                               }
+                               // Add received message to the queue
+                               messageQueue.add(receivedMessage);
+
+                       } catch (JMSException e) {
+                               log.error(e.getMessage(), e);
+                       }
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index 8b07180..2371c32 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -1,18 +1,18 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -29,59 +29,59 @@ import org.apache.stratos.messaging.util.Constants;
  * A thread for receiving health stat information from message broker
  */
 public class HealthStatEventReceiver implements Runnable {
-    private static final Log log = 
LogFactory.getLog(HealthStatEventReceiver.class);
+       private static final Log log = 
LogFactory.getLog(HealthStatEventReceiver.class);
 
-    private HealthStatEventMessageDelegator messageDelegator;
-    private HealthStatEventMessageListener messageListener;
-    private TopicSubscriber topicSubscriber;
-    private boolean terminated;
+       private final HealthStatEventMessageDelegator messageDelegator;
+       private final HealthStatEventMessageListener messageListener;
+       private TopicSubscriber topicSubscriber;
+       private boolean terminated;
 
-    public HealthStatEventReceiver() {
-        HealthStatEventMessageQueue messageQueue = new 
HealthStatEventMessageQueue();
-        this.messageDelegator = new 
HealthStatEventMessageDelegator(messageQueue);
-        this.messageListener = new 
HealthStatEventMessageListener(messageQueue);
-    }
+       public HealthStatEventReceiver() {
+               HealthStatEventMessageQueue messageQueue = new 
HealthStatEventMessageQueue();
+               this.messageDelegator = new 
HealthStatEventMessageDelegator(messageQueue);
+               this.messageListener = new 
HealthStatEventMessageListener(messageQueue);
+       }
 
-    public void addEventListener(EventListener eventListener) {
-        messageDelegator.addEventListener(eventListener);
-    }
+       public void addEventListener(EventListener eventListener) {
+               messageDelegator.addEventListener(eventListener);
+       }
 
-    @Override
-    public void run() {
-        try {
-            // Start topic subscriber thread
-            topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
-            topicSubscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(topicSubscriber);
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Health stats event message receiver thread 
started");
-            }
+       @Override
+       public void run() {
+               try {
+                       // Start topic subscriber thread
+                       topicSubscriber = new 
TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
+                       topicSubscriber.setMessageListener(messageListener);
+                       Thread subscriberThread = new Thread(topicSubscriber);
+                       subscriberThread.start();
+                       if (log.isDebugEnabled()) {
+                               log.debug("Health stats event message receiver 
thread started");
+                       }
 
-            // Start health stat event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Health stats event message delegator thread 
started");
-            }
+                       // Start health stat event message delegator thread
+                       Thread receiverThread = new Thread(messageDelegator);
+                       receiverThread.start();
+                       if (log.isDebugEnabled()) {
+                               log.debug("Health stats event message delegator 
thread started");
+                       }
 
-            // Keep the thread live until terminated
-            while (!terminated) {
-               try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Topology receiver failed", e);
-            }
-        }
-    }
+                       // Keep the thread live until terminated
+                       while (!terminated) {
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException ignore) {
+                               }
+                       }
+               } catch (Exception e) {
+                       if (log.isErrorEnabled()) {
+                               log.error("Topology receiver failed", e);
+                       }
+               }
+       }
 
-    public void terminate() {
-        topicSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
+       public void terminate() {
+               topicSubscriber.terminate();
+               messageDelegator.terminate();
+               terminated = true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
index d8cc6b5..c7d1e98 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
@@ -1,60 +1,90 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 package org.apache.stratos.messaging.message.receiver.instance.notifier;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
 /**
- * Implements functionality for receiving text based event messages from the 
instance notifier
+ * Implements functionality for receiving text based event messages from the
+ * instance notifier
  * message broker topic and add them to the event queue.
  */
-class InstanceNotifierEventMessageListener implements MessageListener {
-
-    private static final Log log = 
LogFactory.getLog(InstanceNotifierEventMessageListener.class);
-
-    private InstanceNotifierEventMessageQueue messageQueue;
-
-    public 
InstanceNotifierEventMessageListener(InstanceNotifierEventMessageQueue 
messageQueue) {
-        this.messageQueue = messageQueue;
-    }
-
-    @Override
-    public void onMessage(Message message) {
-        if (message instanceof TextMessage) {
-            TextMessage receivedMessage = (TextMessage) message;
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Instance notifier message 
received: %s", ((TextMessage) message).getText()));
-                }
-                // Add received message to the queue
-                messageQueue.add(receivedMessage);
-
-            } catch (JMSException e) {
-                log.error(e.getMessage(), e);
-            }
-        }
-    }
+class InstanceNotifierEventMessageListener implements MqttCallback {
+
+       private static final Log log = 
LogFactory.getLog(InstanceNotifierEventMessageListener.class);
+
+       private final InstanceNotifierEventMessageQueue messageQueue;
+
+       public 
InstanceNotifierEventMessageListener(InstanceNotifierEventMessageQueue 
messageQueue) {
+               this.messageQueue = messageQueue;
+       }
+
+       @Override
+       public void connectionLost(Throwable arg0) {
+               if (log.isDebugEnabled()) {
+                       log.debug("MQTT connection lost");
+               }
+
+       }
+
+       @Override
+       public void deliveryComplete(IMqttDeliveryToken arg0) {
+
+       }
+
+       @Override
+       public void messageArrived(String topicName, MqttMessage message) 
throws Exception {
+               if (message instanceof MqttMessage) {
+
+                       TextMessage receivedMessage = new ActiveMQTextMessage();
+                       if (log.isDebugEnabled()) {
+                               log.debug(String.format("instance notifier 
messege received...."));
+
+                       }
+
+                       receivedMessage.setText(new 
String(message.getPayload()));
+                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                                                         
Util.getEventNameForTopic(topicName));
+
+                       try {
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format("Instance 
notifier message received: %s",
+                                                               ((TextMessage) 
message).getText()));
+                               }
+                               // Add received message to the queue
+                               messageQueue.add(receivedMessage);
+
+                       } catch (JMSException e) {
+                               log.error(e.getMessage(), e);
+                       }
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 57fea76..34c73a9 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -1,18 +1,18 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -29,62 +29,62 @@ import org.apache.stratos.messaging.util.Constants;
  * A thread for receiving instance notifier information from message broker.
  */
 public class InstanceNotifierEventReceiver implements Runnable {
-    private static final Log log = 
LogFactory.getLog(InstanceNotifierEventReceiver.class);
-    private InstanceNotifierEventMessageDelegator messageDelegator;
-    private InstanceNotifierEventMessageListener messageListener;
-    private TopicSubscriber topicSubscriber;
-    private boolean terminated;
+       private static final Log log = 
LogFactory.getLog(InstanceNotifierEventReceiver.class);
+       private final InstanceNotifierEventMessageDelegator messageDelegator;
+       private final InstanceNotifierEventMessageListener messageListener;
+       private TopicSubscriber topicSubscriber;
+       private boolean terminated;
 
-    public InstanceNotifierEventReceiver() {
-        InstanceNotifierEventMessageQueue messageQueue = new 
InstanceNotifierEventMessageQueue();
-        this.messageDelegator = new 
InstanceNotifierEventMessageDelegator(messageQueue);
-        this.messageListener = new 
InstanceNotifierEventMessageListener(messageQueue);
-    }
+       public InstanceNotifierEventReceiver() {
+               InstanceNotifierEventMessageQueue messageQueue = new 
InstanceNotifierEventMessageQueue();
+               this.messageDelegator = new 
InstanceNotifierEventMessageDelegator(messageQueue);
+               this.messageListener = new 
InstanceNotifierEventMessageListener(messageQueue);
+       }
 
-    public void addEventListener(EventListener eventListener) {
-        messageDelegator.addEventListener(eventListener);
-    }
+       public void addEventListener(EventListener eventListener) {
+               messageDelegator.addEventListener(eventListener);
+       }
 
-    @Override
-    public void run() {
-        try {
-            // Start topic subscriber thread
-            topicSubscriber = new 
TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC);
-            topicSubscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(topicSubscriber);
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("InstanceNotifier event message receiver thread 
started");
-            }
+       @Override
+       public void run() {
+               try {
+                       // Start topic subscriber thread
+                       topicSubscriber = new 
TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC);
+                       topicSubscriber.setMessageListener(messageListener);
+                       Thread subscriberThread = new Thread(topicSubscriber);
+                       subscriberThread.start();
+                       if (log.isDebugEnabled()) {
+                               log.debug("InstanceNotifier event message 
receiver thread started");
+                       }
 
-            // Start instance notifier event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("InstanceNotifier event message delegator thread 
started");
-            }
+                       // Start instance notifier event message delegator 
thread
+                       Thread receiverThread = new Thread(messageDelegator);
+                       receiverThread.start();
+                       if (log.isDebugEnabled()) {
+                               log.debug("InstanceNotifier event message 
delegator thread started");
+                       }
 
-            // Keep the thread live until terminated
-            while (!terminated) {
-               try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("InstanceNotifier receiver failed", e);
-            }
-        }
-    }
+                       // Keep the thread live until terminated
+                       while (!terminated) {
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException ignore) {
+                               }
+                       }
+               } catch (Exception e) {
+                       if (log.isErrorEnabled()) {
+                               log.error("InstanceNotifier receiver failed", 
e);
+                       }
+               }
+       }
 
-    public boolean isSubscribed() {
-        return ((topicSubscriber != null) && (topicSubscriber.isSubscribed()));
-    }
+       public boolean isSubscribed() {
+               return ((topicSubscriber != null) && 
(topicSubscriber.isSubscribed()));
+       }
 
-    public void terminate() {
-        topicSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
+       public void terminate() {
+               topicSubscriber.terminate();
+               messageDelegator.terminate();
+               terminated = true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
index cafdf74..ad43d15 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
@@ -1,60 +1,84 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 package org.apache.stratos.messaging.message.receiver.tenant;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
 /**
- * Implements functionality for receiving text based event messages from the 
tenant
+ * Implements functionality for receiving text based event messages from the
+ * tenant
  * message broker topic and add them to the event queue.
  */
-class TenantEventMessageListener implements MessageListener {
-
-    private static final Log log = 
LogFactory.getLog(TenantEventMessageListener.class);
-
-    private TenantEventMessageQueue messageQueue;
-
-    public TenantEventMessageListener(TenantEventMessageQueue messageQueue) {
-        this.messageQueue = messageQueue;
-    }
-
-    @Override
-    public void onMessage(Message message) {
-        if (message instanceof TextMessage) {
-            TextMessage receivedMessage = (TextMessage) message;
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Tenant message received: %s", 
((TextMessage) message).getText()));
-                }
-                // Add received message to the queue
-                messageQueue.add(receivedMessage);
-
-            } catch (JMSException e) {
-                log.error(e.getMessage(), e);
-            }
-        }
-    }
+class TenantEventMessageListener implements MqttCallback {
+
+       private static final Log log = 
LogFactory.getLog(TenantEventMessageListener.class);
+
+       private final TenantEventMessageQueue messageQueue;
+
+       public TenantEventMessageListener(TenantEventMessageQueue messageQueue) 
{
+               this.messageQueue = messageQueue;
+       }
+
+       @Override
+       public void connectionLost(Throwable arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void deliveryComplete(IMqttDeliveryToken arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void messageArrived(String topicName, MqttMessage message) 
throws Exception {
+               if (message instanceof MqttMessage) {
+
+                       TextMessage receivedMessage = new ActiveMQTextMessage();
+
+                       receivedMessage.setText(new 
String(message.getPayload()));
+                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                                                         
Util.getEventNameForTopic(topicName));
+
+                       try {
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format("Tanent message 
received: %s",
+                                                               ((TextMessage) 
message).getText()));
+                               }
+                               // Add received message to the queue
+                               messageQueue.add(receivedMessage);
+
+                       } catch (JMSException e) {
+                               log.error(e.getMessage(), e);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
index 799b1b1..54774ea 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java
@@ -1,58 +1,83 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.messaging.message.receiver.topology;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
 /**
- * Implements functionality for receiving text based event messages from the 
topology
+ * Implements functionality for receiving text based event messages from the
+ * topology
  * message broker topic and add them to the event queue.
  */
-class TopologyEventMessageListener implements MessageListener {
-    private static final Log log = 
LogFactory.getLog(TopologyEventMessageListener.class);
-
-    private TopologyEventMessageQueue messageQueue;
-
-    public TopologyEventMessageListener(TopologyEventMessageQueue 
messageQueue) {
-        this.messageQueue = messageQueue;
-    }
-
-    @Override
-    public void onMessage(Message message) {
-        if (message instanceof TextMessage) {
-            TextMessage receivedMessage = (TextMessage) message;
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Topology message received: %s", 
((TextMessage) message).getText()));
-                }
-                // Add received message to the queue
-                messageQueue.add(receivedMessage);
-
-            } catch (JMSException e) {
-                log.error(e.getMessage(), e);
-            }
-        }
-    }
+class TopologyEventMessageListener implements MqttCallback {
+       private static final Log log = 
LogFactory.getLog(TopologyEventMessageListener.class);
+
+       private final TopologyEventMessageQueue messageQueue;
+
+       public TopologyEventMessageListener(TopologyEventMessageQueue 
messageQueue) {
+               this.messageQueue = messageQueue;
+       }
+
+       @Override
+       public void connectionLost(Throwable arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void deliveryComplete(IMqttDeliveryToken arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void messageArrived(String topicName, MqttMessage message) 
throws Exception {
+               if (message instanceof MqttMessage) {
+
+                       TextMessage receivedMessage = new ActiveMQTextMessage();
+
+                       receivedMessage.setText(new 
String(message.getPayload()));
+                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                                                         
Util.getEventNameForTopic(topicName));
+
+                       try {
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format("topology 
message received: %s",
+                                                               ((TextMessage) 
message).getText()));
+                               }
+                               // Add received message to the queue
+                               messageQueue.add(receivedMessage);
+
+                       } catch (JMSException e) {
+                               log.error(e.getMessage(), e);
+                       }
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 5991e25..a615eb4 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -1,18 +1,18 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -20,59 +20,60 @@ package org.apache.stratos.messaging.util;
 
 public class Constants {
        /* Message broker topic names */
-       public static final String TOPOLOGY_TOPIC = "topology";
-       public static final String HEALTH_STAT_TOPIC = 
"summarized-health-stats";
-    public static final String INSTANCE_STATUS_TOPIC = "instance-status";
-    public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier";
-    public static final String PING_TOPIC = "ping";
-    public static final String TENANT_TOPIC = "tenant";
-    public static final String TENANT_RANGE_ALL = "*";
+       public static final String TOPOLOGY_TOPIC = "topology/#";
+       public static final String HEALTH_STAT_TOPIC = "health/#";
+       public static final String INSTANCE_STATUS_TOPIC = "instance/#";
+       public static final String INSTANCE_NOTIFIER_TOPIC = "instance/#";
+       public static final String PING_TOPIC = "ping";
+       public static final String TENANT_TOPIC = "tenant/#";
+       public static final String TENANT_RANGE_ALL = "*";
 
-    public static final String TENANT_RANGE_DELIMITER = "-";
-    public static final String EVENT_CLASS_NAME = "event-class-name";
+       public static final String TENANT_RANGE_DELIMITER = "-";
+       public static final String EVENT_CLASS_NAME = "event-class-name";
 
-    /* Topology filter constants */
-    public static final String FILTER_VALUE_ASSIGN_OPERATOR="=";
-    public static final String FILTER_KEY_VALUE_PAIR_SEPARATOR = "|";
-    public static final String FILTER_VALUE_SEPARATOR = ",";
+       /* Topology filter constants */
+       public static final String FILTER_VALUE_ASSIGN_OPERATOR = "=";
+       public static final String FILTER_KEY_VALUE_PAIR_SEPARATOR = "|";
+       public static final String FILTER_VALUE_SEPARATOR = ",";
 
-    public static final String TOPOLOGY_SERVICE_FILTER = 
"stratos.topology.service.filter";
-    public static final String TOPOLOGY_SERVICE_FILTER_SERVICE_NAME = 
"service-name";
+       public static final String TOPOLOGY_SERVICE_FILTER = 
"stratos.topology.service.filter";
+       public static final String TOPOLOGY_SERVICE_FILTER_SERVICE_NAME = 
"service-name";
 
-    public static final String TOPOLOGY_CLUSTER_FILTER = 
"stratos.topology.cluster.filter";
-    public static final String TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID = 
"cluster-id";
+       public static final String TOPOLOGY_CLUSTER_FILTER = 
"stratos.topology.cluster.filter";
+       public static final String TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID = 
"cluster-id";
 
-    public static final String TOPOLOGY_MEMBER_FILTER = 
"stratos.topology.member.filter";
-    public static final String TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID = 
"lb-cluster-id";
+       public static final String TOPOLOGY_MEMBER_FILTER = 
"stratos.topology.member.filter";
+       public static final String TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID = 
"lb-cluster-id";
 
-    public static final String REQUEST_BASE_CONTEXT = 
"org.wso2.carbon.context.RequestBaseContext";
-    
-    // to identify a lb cluster
-    public static final String IS_LOAD_BALANCER = "load.balancer";
-    public static final String LOAD_BALANCER_REF = "load.balancer.ref";
-    public static final String SERVICE_AWARE_LOAD_BALANCER = 
"service.aware.load.balancer";
-    public static final String DEFAULT_LOAD_BALANCER = "default.load.balancer";
-    public static final String NO_LOAD_BALANCER = "no.load.balancer";
-    public static final String EXISTING_LOAD_BALANCERS = 
"existing.load.balancers";
-    public static final String LOAD_BALANCED_SERVICE_TYPE = 
"load.balanced.service.type";
+       public static final String REQUEST_BASE_CONTEXT = 
"org.wso2.carbon.context.RequestBaseContext";
 
-    // volume 
-    public static final String IS_VOLUME_REQUIRED = "volume.required";
-    public static final String SHOULD_DELETE_VOLUME = 
"volume.delete.on.unsubscription";
-    public static final String VOLUME_SIZE = "volume.size.gb";
-    public static final String VOLUME_ID = "volume.id";
-    public static final String DEVICE_NAME = "volume.device.name";
-       public static final String GRACEFUL_SHUTDOWN_TIMEOUT = 
"graceful.shutdown.timeout";
+       // to identify a lb cluster
+       public static final String IS_LOAD_BALANCER = "load.balancer";
+       public static final String LOAD_BALANCER_REF = "load.balancer.ref";
+       public static final String SERVICE_AWARE_LOAD_BALANCER = 
"service.aware.load.balancer";
+       public static final String DEFAULT_LOAD_BALANCER = 
"default.load.balancer";
+       public static final String NO_LOAD_BALANCER = "no.load.balancer";
+       public static final String EXISTING_LOAD_BALANCERS = 
"existing.load.balancers";
+       public static final String LOAD_BALANCED_SERVICE_TYPE = 
"load.balanced.service.type";
 
-    public static final String IS_PRIMARY = "PRIMARY";
+       // volume
+       public static final String IS_VOLUME_REQUIRED = "volume.required";
+       public static final String SHOULD_DELETE_VOLUME = 
"volume.delete.on.unsubscription";
+       public static final String VOLUME_SIZE = "volume.size.gb";
+       public static final String VOLUME_ID = "volume.id";
+       public static final String DEVICE_NAME = "volume.device.name";
+       public static final String GRACEFUL_SHUTDOWN_TIMEOUT = 
"graceful.shutdown.timeout";
 
-    //System Properties
-    public static final String AVERAGE_PING_INTERVAL_PROPERTY = 
"stratos.messaging.averagePingInterval";
-    public static final String FAILOVER_PING_INTERVAL_PROPERTY = 
"stratos.messaging.failoverPingInterval";
+       public static final String IS_PRIMARY = "PRIMARY";
 
-    //Default values
-    public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000;
-    public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000;
+       // System Properties
+       public static final String AVERAGE_PING_INTERVAL_PROPERTY =
+                                                                   
"stratos.messaging.averagePingInterval";
+       public static final String FAILOVER_PING_INTERVAL_PROPERTY =
+                                                                    
"stratos.messaging.failoverPingInterval";
 
+       // Default values
+       public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000;
+       public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000;
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java
index f32b91c..3f2c5cf 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java
@@ -1,37 +1,38 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
  * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.messaging.util;
 
 /**
- * Had to wrap {@link Property} array using a class, since there's a bug in 
current 
+ * Had to wrap {@link Property} array using a class, since there's a bug in
+ * current
  * stub generation.
  */
 public class Properties {
 
-    private Property[] properties;
+       private Property[] properties;
+
+       public Property[] getProperties() {
+               return properties;
+       }
 
-    public Property[] getProperties() {
-        return properties;
-    }
+       public void setProperties(Property[] properties) {
+               this.properties = properties;
+       }
 
-    public void setProperties(Property[] properties) {
-        this.properties = properties;
-    }
-    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
index 95b5a07..a6db9ee 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
@@ -1,32 +1,34 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.messaging.util;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.message.JsonMessage;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.message.JsonMessage;
 
 public class Util {
        private static final Log log = LogFactory.getLog(Util.class);
@@ -44,9 +46,9 @@ public class Util {
                        log.error("Failed to load properties from file: " + 
filePath, e);
                } finally {
                        try {
-                if(is != null) {
-                                   is.close();
-                }
+                               if (is != null) {
+                                       is.close();
+                               }
                        } catch (IOException ignore) {
                        }
                }
@@ -54,96 +56,120 @@ public class Util {
                return props;
        }
 
-    /**
-     * Validate tenant range.
-     * Valid formats: Integer-Integer, Integer-*
-     * Examples: 1-100, 101-200, 201-*
-     * @param tenantRange
-     */
-    public static void validateTenantRange(String tenantRange) {
-        boolean valid = false;
-        if(tenantRange != null) {
-            if(tenantRange.equals("*")) {
-                valid = true;
-            } else {
-               String[] array = 
tenantRange.split(Constants.TENANT_RANGE_DELIMITER);
-                if(array.length == 2) {
-                    // Integer-Integer
-                    if(isNumber(array[0]) && (isNumber(array[1]))){
-                        valid = true;
-                    }
-                    // Integer-*
-                    else if(isNumber(array[0]) && "*".equals(array[1])) {
-                        valid = true;
-                    }
-                }
-            }
-
-        }
-        if(!valid)
-            throw new RuntimeException(String.format("Tenant range %s is not 
valid", tenantRange));
-    }
-
-    public static boolean isNumber(String s) {
-        try {
-            Integer.parseInt(s);
-            return true;
-        }
-        catch (NumberFormatException e) {
-            // Not a valid number
-        }
-        return false;
-    }
-    
-    /**
-     * Transform json into an object of given type.
-     * @param json
-     * @param type
-     * @return
-     */
-    public static Object jsonToObject(String json, Class type) {
-        return (new JsonMessage(json, type)).getObject();
-    }
-
-
-    // Time interval between each ping message sent to topic.
-    private static int averagePingInterval;
-
-    // Time interval between each ping message after an error had occurred.
-    private static int failoverPingInterval;
-
-    /**
-     * fetch value from system param
-     * @return
-     */
-    public static int getAveragePingInterval() {
-        if (averagePingInterval <= 0) {
-            averagePingInterval = 
Util.getNumericSystemProperty(Constants.DEFAULT_AVERAGE_PING_INTERVAL,Constants.AVERAGE_PING_INTERVAL_PROPERTY);
-        }
-        return averagePingInterval;
-    }
-
-    /**
-     * fetch value from system param
-     * @return
-     */
-    public static int getFailoverPingInterval() {
-        if (failoverPingInterval <= 0) {
-            failoverPingInterval = 
Util.getNumericSystemProperty(Constants.DEFAULT_FAILOVER_PING_INTERVAL,Constants.FAILOVER_PING_INTERVAL_PROPERTY);
-        }
-        return failoverPingInterval;
-    }
-
-    /**
-     * Method to safely access numeric system properties
-     * @param defaultValue
-     * @return
-     */
-    public static Integer getNumericSystemProperty(Integer defaultValue, 
String propertyKey) {
-        try {
-            return Integer.valueOf(System.getProperty(propertyKey));
-        } catch (NumberFormatException ex) {
-            return defaultValue;
-        }
-    }
+       /**
+        * Validate tenant range.
+        * Valid formats: Integer-Integer, Integer-*
+        * Examples: 1-100, 101-200, 201-*
+        * 
+        * @param tenantRange
+        */
+       public static void validateTenantRange(String tenantRange) {
+               boolean valid = false;
+               if (tenantRange != null) {
+                       if (tenantRange.equals("*")) {
+                               valid = true;
+                       } else {
+                               String[] array = 
tenantRange.split(Constants.TENANT_RANGE_DELIMITER);
+                               if (array.length == 2) {
+                                       // Integer-Integer
+                                       if (isNumber(array[0]) && 
(isNumber(array[1]))) {
+                                               valid = true;
+                                       }
+                                       // Integer-*
+                                       else if (isNumber(array[0]) && 
"*".equals(array[1])) {
+                                               valid = true;
+                                       }
+                               }
+                       }
+
+               }
+               if (!valid)
+                       throw new RuntimeException(String.format("Tenant range 
%s is not valid", tenantRange));
+       }
+
+       public static boolean isNumber(String s) {
+               try {
+                       Integer.parseInt(s);
+                       return true;
+               } catch (NumberFormatException e) {
+                       // Not a valid number
+               }
+               return false;
+       }
+
+       /**
+        * Transform json into an object of given type.
+        * 
+        * @param json
+        * @param type
+        * @return
+        */
+       public static Object jsonToObject(String json, Class type) {
+               return (new JsonMessage(json, type)).getObject();
+       }
+
+       // Time interval between each ping message sent to topic.
+       private static int averagePingInterval;
+
+       // Time interval between each ping message after an error had occurred.
+       private static int failoverPingInterval;
+
+       /**
+        * fetch value from system param
+        * 
+        * @return
+        */
+       public static int getAveragePingInterval() {
+               if (averagePingInterval <= 0) {
+                       averagePingInterval =
+                                             
Util.getNumericSystemProperty(Constants.DEFAULT_AVERAGE_PING_INTERVAL,
+                                                                           
Constants.AVERAGE_PING_INTERVAL_PROPERTY);
+               }
+               return averagePingInterval;
+       }
+
+       /**
+        * fetch value from system param
+        * 
+        * @return
+        */
+       public static int getFailoverPingInterval() {
+               if (failoverPingInterval <= 0) {
+                       failoverPingInterval =
+                                              
Util.getNumericSystemProperty(Constants.DEFAULT_FAILOVER_PING_INTERVAL,
+                                                                            
Constants.FAILOVER_PING_INTERVAL_PROPERTY);
+               }
+               return failoverPingInterval;
+       }
+
+       /**
+        * Method to safely access numeric system properties
+        * 
+        * @param defaultValue
+        * @return
+        */
+       public static Integer getNumericSystemProperty(Integer defaultValue, 
String propertyKey) {
+               try {
+                       return Integer.valueOf(System.getProperty(propertyKey));
+               } catch (NumberFormatException ex) {
+                       return defaultValue;
+               }
+       }
+
+       public static String getMessageTopicName(Event event) {
+               return event.getClass().getName().substring(35).replace(".", 
"/");
+       }
+
+       public static String getEventNameForTopic(String arg0) {
+               return 
"org.apache.stratos.messaging.event.".concat(arg0.replace("/", "."));
+       }
+
+       public static String getRandomString(int len) {
+               String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+               Random rnd = new Random();
+               StringBuilder sb = new StringBuilder(len);
+               for (int i = 0; i < len; i++)
+                       sb.append(AB.charAt(rnd.nextInt(AB.length())));
+               return sb.toString();
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 80174f4..0205566 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -1,23 +1,30 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.cep.extension;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.log4j.Logger;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
@@ -28,7 +35,6 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
 import org.wso2.siddhi.core.config.SiddhiContext;
 import org.wso2.siddhi.core.event.StreamEvent;
 import org.wso2.siddhi.core.event.in.InEvent;
@@ -47,210 +53,224 @@ import 
org.wso2.siddhi.query.api.expression.constant.IntConstant;
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 @SiddhiExtension(namespace = "stratos", function = "faultHandling")
-public class FaultHandlingWindowProcessor extends WindowProcessor implements 
RunnableWindowProcessor {
+public class FaultHandlingWindowProcessor extends WindowProcessor implements
+                                                                 
RunnableWindowProcessor {
+
+       private static final String HEALTH_STAT_MEMBER_FAULT_EVENT = 
"health/stat/MemberFaultEvent";
+       private static final int TIME_OUT = 60 * 1000;
+       static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
+       private ScheduledExecutorService eventRemoverScheduler;
+       private int subjectedAttrIndex;
+       private ThreadBarrier threadBarrier;
+       private long timeToKeep;
+       private ISchedulerSiddhiQueue<StreamEvent> window;
+       private final ConcurrentHashMap<String, Long> memberTimeStampMap =
+                                                                          new 
ConcurrentHashMap<String, Long>();
+       private final ConcurrentHashMap<String, Member> memberIdMap =
+                                                                     new 
ConcurrentHashMap<String, Member>();
 
-    private static final int TIME_OUT = 60 * 1000;
-    static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
-    private ScheduledExecutorService eventRemoverScheduler;
-    private int subjectedAttrIndex;
-    private ThreadBarrier threadBarrier;
-    private long timeToKeep;
-    private ISchedulerSiddhiQueue<StreamEvent> window;
-    private ConcurrentHashMap<String, Long> memberTimeStampMap = new 
ConcurrentHashMap<String, Long>();
-    private ConcurrentHashMap<String, Member> memberIdMap = new 
ConcurrentHashMap<String, Member>();
-    EventPublisher healthStatPublisher = 
EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC);
-    Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
-    Map<String, Object> memberFaultEventMessageMap = new HashMap<String, 
Object>();
-    private TopologyEventReceiver topologyEventReceiver;
-    private String memberID;
+       EventPublisher healthStatPublisher = null;
+       Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+       Map<String, Object> memberFaultEventMessageMap = new HashMap<String, 
Object>();
+       private TopologyEventReceiver topologyEventReceiver;
+       private String memberID;
 
-    @Override
-    protected void processEvent(InEvent event) {
-        addDataToMap(event);
-    }
+       @Override
+       protected void processEvent(InEvent event) {
+               addDataToMap(event);
+       }
 
-    @Override
-    protected void processEvent(InListEvent listEvent) {
-        System.out.println(listEvent);
-        for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
-            addDataToMap((InEvent) listEvent.getEvent(i));
-        }
-    }
+       @Override
+       protected void processEvent(InListEvent listEvent) {
+               System.out.println(listEvent);
+               for (int i = 0, size = listEvent.getActiveEvents(); i < size; 
i++) {
+                       addDataToMap((InEvent) listEvent.getEvent(i));
+               }
+       }
 
-    protected void addDataToMap(InEvent event) {
-        if (memberID != null) {
-            String id = (String)event.getData()[subjectedAttrIndex];
-            memberTimeStampMap.put(id, event.getTimeStamp());
-            log.debug("Event received from [member-id] " + id);
-        }
-        else {
-            log.error("NULL member ID in the event received");
-        }
-    }
+       protected void addDataToMap(InEvent event) {
+               if (memberID != null) {
+                       String id = (String) 
event.getData()[subjectedAttrIndex];
+                       memberTimeStampMap.put(id, event.getTimeStamp());
+                       log.debug("Event received from [member-id] " + id);
+               } else {
+                       log.error("NULL member ID in the event received");
+               }
+       }
 
-    @Override
-    public Iterator<StreamEvent> iterator() {
-        return window.iterator();
-    }
+       @Override
+       public Iterator<StreamEvent> iterator() {
+               return window.iterator();
+       }
 
-    @Override
-    public Iterator<StreamEvent> iterator(String predicate) {
-        if (siddhiContext.isDistributedProcessingEnabled()) {
-            return ((SchedulerSiddhiQueueGrid<StreamEvent>) 
window).iterator(predicate);
-        } else {
-            return window.iterator();
-        }
-    }
+       @Override
+       public Iterator<StreamEvent> iterator(String predicate) {
+               if (siddhiContext.isDistributedProcessingEnabled()) {
+                       return ((SchedulerSiddhiQueueGrid<StreamEvent>) 
window).iterator(predicate);
+               } else {
+                       return window.iterator();
+               }
+       }
 
-    /*
-    *  Retrieve the current activated member list from the topology and put 
them into the
-    *  memberTimeStampMap if not already exists. This will allow the system to 
recover
-    *  from any inconsistent state caused by MB/CEP failures.
-    */
-    private void loadFromTopology(){
-        if (TopologyManager.getTopology().isInitialized()){
-            TopologyManager.acquireReadLock();
-            memberIdMap.clear();
-            long currentTimeStamp = System.currentTimeMillis();
-            Iterator<Service> servicesItr = 
TopologyManager.getTopology().getServices().iterator();
-            while(servicesItr.hasNext()){
-                Service service = servicesItr.next();
-                Iterator<Cluster> clusterItr = 
service.getClusters().iterator();
-                while(clusterItr.hasNext()){
-                    Cluster cluster = clusterItr.next();
-                    Iterator<Member> memberItr = 
cluster.getMembers().iterator();
-                    while(memberItr.hasNext()){
-                        Member member = memberItr.next();
-                        if (member.getStatus().equals(MemberStatus.Activated)){
-                            
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
-                            memberIdMap.put(member.getMemberId(), member);
-                        }
-                    }
-                }
-            }
-            TopologyManager.releaseReadLock();
-        }
-        if (log.isDebugEnabled()){
-            log.debug("Member TimeStamp Map: " + memberTimeStampMap);
-            log.debug("Member ID Map: " + memberIdMap);
-        }
-    }
+       /*
+        * Retrieve the current activated member list from the topology and put 
them
+        * into the
+        * memberTimeStampMap if not already exists. This will allow the system 
to
+        * recover
+        * from any inconsistent state caused by MB/CEP failures.
+        */
+       private void loadFromTopology() {
+               if (TopologyManager.getTopology().isInitialized()) {
+                       TopologyManager.acquireReadLock();
+                       memberIdMap.clear();
+                       long currentTimeStamp = System.currentTimeMillis();
+                       Iterator<Service> servicesItr = 
TopologyManager.getTopology().getServices().iterator();
+                       while (servicesItr.hasNext()) {
+                               Service service = servicesItr.next();
+                               Iterator<Cluster> clusterItr = 
service.getClusters().iterator();
+                               while (clusterItr.hasNext()) {
+                                       Cluster cluster = clusterItr.next();
+                                       Iterator<Member> memberItr = 
cluster.getMembers().iterator();
+                                       while (memberItr.hasNext()) {
+                                               Member member = 
memberItr.next();
+                                               if 
(member.getStatus().equals(MemberStatus.Activated)) {
+                                                       
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+                                                       
memberIdMap.put(member.getMemberId(), member);
+                                               }
+                                       }
+                               }
+                       }
+                       TopologyManager.releaseReadLock();
+               }
+               if (log.isDebugEnabled()) {
+                       log.debug("Member TimeStamp Map: " + 
memberTimeStampMap);
+                       log.debug("Member ID Map: " + memberIdMap);
+               }
+       }
 
-    private void publishMemberFault(String memberID){
-        Member member = memberIdMap.get(memberID);
-        if (member == null){
-            log.error("Failed to publish MemberFault event. Member having 
[member-id] " + memberID + " does not exist in topology");
-            return;
-        }
-        MemberFaultEvent memberFaultEvent = new 
MemberFaultEvent(member.getClusterId(), member.getMemberId(), 
member.getPartitionId(), 0);
-        memberFaultEventMessageMap.put("message", memberFaultEvent);
-        Properties headers = new Properties();
-        headers.put(Constants.EVENT_CLASS_NAME, 
memberFaultEvent.getClass().getName());
-        healthStatPublisher.publish(MemberFaultEventMap, headers, true);
+       private void publishMemberFault(String memberID) {
+               Member member = memberIdMap.get(memberID);
+               if (member == null) {
+                       log.error("Failed to publish MemberFault event. Member 
having [member-id] " + memberID +
+                                 " does not exist in topology");
+                       return;
+               }
+               MemberFaultEvent memberFaultEvent =
+                                                   new 
MemberFaultEvent(member.getClusterId(),
+                                                                        
member.getMemberId(),
+                                                                        
member.getPartitionId(), 0);
+               memberFaultEventMessageMap.put("message", memberFaultEvent);
 
-        if (log.isDebugEnabled()){
-            log.debug("Published MemberFault event for [member-id] " + 
memberID);
-        }
-    }
+               healthStatPublisher = 
EventPublisherPool.getPublisher(HEALTH_STAT_MEMBER_FAULT_EVENT);
+               healthStatPublisher.publish(MemberFaultEventMap, true);
 
+               if (log.isDebugEnabled()) {
+                       log.debug("Published MemberFault event for [member-id] 
" + memberID);
+               }
+       }
 
-    @Override
-    public void run() {
-        try {
-            threadBarrier.pass();
-            loadFromTopology();
-            Iterator it = memberTimeStampMap.entrySet().iterator();
+       @Override
+       public void run() {
+               try {
+                       threadBarrier.pass();
+                       loadFromTopology();
+                       Iterator it = memberTimeStampMap.entrySet().iterator();
 
-            while ( it.hasNext() ) {
-                Map.Entry pair = (Map.Entry)it.next();
-                long currentTime = System.currentTimeMillis();
-                Long eventTimeStamp = (Long) pair.getValue();
+                       while (it.hasNext()) {
+                               Map.Entry pair = (Map.Entry) it.next();
+                               long currentTime = System.currentTimeMillis();
+                               Long eventTimeStamp = (Long) pair.getValue();
 
-                if ((currentTime - eventTimeStamp) > TIME_OUT) {
-                    log.info("Faulty member detected [member-id] " + 
pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + 
TIME_OUT + " milliseconds");
-                    it.remove();
-                    publishMemberFault((String) pair.getKey());
-                }
-            }
-            if (log.isDebugEnabled()){
-                log.debug("Fault handling processor iteration completed with 
[time-stamp map length] " + memberTimeStampMap.size() + " [activated 
member-count] " + memberIdMap.size());
-            }
-            eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
-        } catch (Throwable t) {
-            log.error(t.getMessage(), t);
-        }
-    }
+                               if ((currentTime - eventTimeStamp) > TIME_OUT) {
+                                       log.info("Faulty member detected 
[member-id] " + pair.getKey() +
+                                                " with [last time-stamp] " + 
eventTimeStamp + " [time-out] " +
+                                                TIME_OUT + " milliseconds");
+                                       it.remove();
+                                       publishMemberFault((String) 
pair.getKey());
+                               }
+                       }
+                       if (log.isDebugEnabled()) {
+                               log.debug("Fault handling processor iteration 
completed with [time-stamp map length] " +
+                                         memberTimeStampMap.size() +
+                                         " [activated member-count] " +
+                                         memberIdMap.size());
+                       }
+                       eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+               } catch (Throwable t) {
+                       log.error(t.getMessage(), t);
+               }
+       }
 
-    @Override
-    protected Object[] currentState() {
-        return new Object[]{window.currentState()};
-    }
+       @Override
+       protected Object[] currentState() {
+               return new Object[] { window.currentState() };
+       }
 
-    @Override
-    protected void restoreState(Object[] data) {
-        window.restoreState(data);
-        window.restoreState((Object[]) data[0]);
-        window.reSchedule();
-    }
+       @Override
+       protected void restoreState(Object[] data) {
+               window.restoreState(data);
+               window.restoreState((Object[]) data[0]);
+               window.reSchedule();
+       }
 
-    @Override
-    protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean 
async, SiddhiContext siddhiContext) {
-        if (parameters[0] instanceof IntConstant) {
-            timeToKeep = ((IntConstant) parameters[0]).getValue();
-        } else {
-            timeToKeep = ((LongConstant) parameters[0]).getValue();
-        }
+       @Override
+       protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor,
+                           AbstractDefinition streamDefinition, String 
elementId, boolean async,
+                           SiddhiContext siddhiContext) {
+               if (parameters[0] instanceof IntConstant) {
+                       timeToKeep = ((IntConstant) parameters[0]).getValue();
+               } else {
+                       timeToKeep = ((LongConstant) parameters[0]).getValue();
+               }
 
-        memberID = ((Variable)parameters[1]).getAttributeName();
+               memberID = ((Variable) parameters[1]).getAttributeName();
 
-        String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
-        subjectedAttrIndex = 
streamDefinition.getAttributePosition(subjectedAttr);
+               String subjectedAttr = ((Variable) 
parameters[1]).getAttributeName();
+               subjectedAttrIndex = 
streamDefinition.getAttributePosition(subjectedAttr);
 
-        if (this.siddhiContext.isDistributedProcessingEnabled()) {
-            window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, 
this, this.siddhiContext, this.async);
-        } else {
-            window = new SchedulerSiddhiQueue<StreamEvent>(this);
-        }
-        
MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent",
 memberFaultEventMessageMap);
-        this.topologyEventReceiver = new TopologyEventReceiver();
-        Thread thread = new Thread(topologyEventReceiver);
-        thread.start();
-        log.info("WSO2 CEP topology receiver thread started");
+               if (this.siddhiContext.isDistributedProcessingEnabled()) {
+                       window =
+                                new 
SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext,
+                                                                          
this.async);
+               } else {
+                       window = new SchedulerSiddhiQueue<StreamEvent>(this);
+               }
+               
MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent",
+                                       memberFaultEventMessageMap);
+               this.topologyEventReceiver = new TopologyEventReceiver();
+               Thread thread = new Thread(topologyEventReceiver);
+               thread.start();
+               log.info("WSO2 CEP topology receiver thread started");
 
-        //Ordinary scheduling
-        window.schedule();
+               // Ordinary scheduling
+               window.schedule();
 
-    }
+       }
 
-    @Override
-    public void schedule() {
-        eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
-    }
+       @Override
+       public void schedule() {
+               eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+       }
 
-    @Override
-    public void scheduleNow() {
-        eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
-    }
+       @Override
+       public void scheduleNow() {
+               eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+       }
 
-    @Override
-    public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {
-        this.eventRemoverScheduler = scheduledExecutorService;
-    }
+       @Override
+       public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {
+               this.eventRemoverScheduler = scheduledExecutorService;
+       }
 
-    @Override
-    public void setThreadBarrier(ThreadBarrier threadBarrier) {
-        this.threadBarrier = threadBarrier;
-    }
+       @Override
+       public void setThreadBarrier(ThreadBarrier threadBarrier) {
+               this.threadBarrier = threadBarrier;
+       }
 
-    @Override
-    public void destroy(){
-        this.topologyEventReceiver.terminate();
-        window = null;
-    }
+       @Override
+       public void destroy() {
+               this.topologyEventReceiver.terminate();
+               window = null;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml
----------------------------------------------------------------------
diff --git 
a/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml 
b/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml
index 125bbd3..2b0120f 100644
--- 
a/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml
+++ 
b/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml
@@ -275,6 +275,7 @@
                                 </bundleDef>
                                 
<bundleDef>org.apache.jclouds.driver:jclouds-bouncycastle:${jclouds.version}</bundleDef>
                                 
<bundleDef>org.apache.jclouds.driver:jclouds-sshj:${jclouds.version}</bundleDef>
+                                
<!--bundleDef>org.apache.jclouds.driver:jclouds-log4j:${jclouds.version}</bundleDef-->
                                 
<bundleDef>org.apache.jclouds.driver:jclouds-enterprise:${jclouds.version}</bundleDef>
                                 
<bundleDef>org.apache.jclouds:jclouds-core:${jclouds.version}</bundleDef>
                                 
<bundleDef>org.apache.jclouds:jclouds-compute:${jclouds.version}</bundleDef>
@@ -288,12 +289,14 @@
                                 
<bundleDef>org.apache.jclouds.api:openstack-keystone:${jclouds.version}</bundleDef>
                                 
<bundleDef>org.apache.stratos:aws-ec2:1.8.0-stratos</bundleDef>
                                 
<bundleDef>com.google.guava:guava:17.0</bundleDef>
+
+
                                 <bundleDef>
                                     
org.apache.servicemix.bundles:org.apache.servicemix.bundles.jsch-agentproxy-jsch:0.0.7_1
                                 </bundleDef>
                                 
<bundleDef>com.jcraft:jsch.agentproxy.connector-factory:0.0.7</bundleDef>
                                 
<bundleDef>com.jcraft:jsch.agentproxy.sshagent:0.0.7</bundleDef>
-                                
<bundleDef>com.jcraft:jsch.agentproxy.usocket-nc:0.0.7</bundleDef>
+                                
<bundleDef>com.jcraft:jsch.agentproxy.usocket-jna:0.0.7</bundleDef>
                                 
<bundleDef>com.jcraft:jsch.agentproxy.core:0.0.7</bundleDef>
                                 
<bundleDef>net.java.dev.jna:jna:4.1.0</bundleDef>
                                 <bundleDef>
@@ -302,6 +305,7 @@
                                 <bundleDef>net.schmizz:sshj:0.9.0</bundleDef>
                                 
<bundleDef>org.apache.servicemix.bundles:org.apache.servicemix.bundles.jzlib:1.1.1_1
                                 </bundleDef>
+
                                 
<bundleDef>com.google.code.gson:gson:${gson2.version}</bundleDef>
                                 
<bundleDef>com.google.guice.wso2:guice:${google.guice.wso2.version}</bundleDef>
                                 <bundleDef>
@@ -323,6 +327,7 @@
                                 </bundleDef>
                                 
<bundleDef>jdom.wso2:jdom:1.0.0.wso2v1</bundleDef>
                                 
<bundleDef>org.json.wso2:json:1.0.0.wso2v1</bundleDef>
+                                
<!--bundleDef>org.jaggeryjs:0.9.0.ALPHA2-wso2v2</bundleDef-->
                                 
<bundleDef>org.apache.jclouds.api:sts:${jclouds.version}</bundleDef>
                                 
<bundleDef>javax.ws.rs:jsr311-api:1.1.1</bundleDef>
                                 
<bundleDef>org.apache.stratos:org.apache.stratos.messaging:${project.version}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0592097..0914560 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,10 @@
                 <checksumPolicy>ignore</checksumPolicy>
             </releases>
         </repository>
-
+               <repository>
+               <id>Eclipse Paho Repo</id>
+               
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+       </repository>
         <repository>
             <id>central</id>
             <name>Maven Repository Switchboard</name>

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/products/stratos/conf/mqtttopic.properties
----------------------------------------------------------------------
diff --git a/products/stratos/conf/mqtttopic.properties 
b/products/stratos/conf/mqtttopic.properties
new file mode 100644
index 0000000..823c1a9
--- /dev/null
+++ b/products/stratos/conf/mqtttopic.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+mqtturl=tcp://localhost:1883
+clientID=stratos
+tempfilelocation=/tmp

Reply via email to