http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index dcb8be4..cc9f34f 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -1,18 +1,18 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
  * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -21,14 +21,11 @@ package org.apache.stratos.manager.internal;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.manager.listener.InstanceStatusListener;
-import org.apache.stratos.manager.listener.TenantUserRoleCreator;
 import org.apache.stratos.manager.publisher.TenantEventPublisher;
 import org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
 import 
org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver;
 import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
-import org.apache.stratos.manager.utils.UserRoleCreator;
-import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
 import org.apache.stratos.messaging.util.Constants;
 import org.osgi.service.component.ComponentContext;
@@ -38,7 +35,9 @@ import org.wso2.carbon.user.core.service.RealmService;
 import org.wso2.carbon.utils.ConfigurationContextService;
 
 /**
- * @scr.component 
name="org.wso2.carbon.hosting.mgt.internal.ADCManagementServerComponent"
+ * @scr.component
+ *                name=
+ *                
"org.wso2.carbon.hosting.mgt.internal.ADCManagementServerComponent"
  *                immediate="true"
  * @scr.reference name="config.context.service"
  *                interface="org.wso2.carbon.utils.ConfigurationContextService"
@@ -54,138 +53,141 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  *                "org.wso2.carbon.registry.core.service.RegistryService"
  *                cardinality="1..1" policy="dynamic" bind="setRegistryService"
  *                unbind="unsetRegistryService"
- * @scr.reference name="ntask.component" 
interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * @scr.reference name="ntask.component"
+ *                interface="org.wso2.carbon.ntask.core.service.TaskService"
  *                cardinality="1..1" policy="dynamic" bind="setTaskService"
  *                unbind="unsetTaskService"
  */
 
 public class ADCManagementServerComponent {
 
-    private static final Log log = 
LogFactory.getLog(ADCManagementServerComponent.class);
-    private StratosManagerTopologyEventReceiver 
stratosManagerTopologyEventReceiver;
+       private static final Log log = 
LogFactory.getLog(ADCManagementServerComponent.class);
+       private StratosManagerTopologyEventReceiver 
stratosManagerTopologyEventReceiver;
 
-    protected void activate(ComponentContext componentContext) throws 
Exception {
+       protected void activate(ComponentContext componentContext) throws 
Exception {
                try {
                        CartridgeConfigFileReader.readProperties();
 
-            // Schedule complete tenant event synchronizer
-            if(log.isDebugEnabled()) {
-                log.debug("Scheduling tenant synchronizer task...");
-            }
-            
TenantSynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
-
-            // Register tenant event publisher
-            if(log.isDebugEnabled()) {
-                log.debug("Starting tenant event publisher...");
-            }
-            TenantEventPublisher tenantEventPublisher = new 
TenantEventPublisher();
-            componentContext.getBundleContext().registerService(
-                    
org.apache.stratos.common.listeners.TenantMgtListener.class.getName(),
-                    tenantEventPublisher, null);
-
-            // Start instance status topic subscriber
-            if(log.isDebugEnabled()) {
-                log.debug("Starting instance status topic subscriber...");
-            }
-            TopicSubscriber subscriber = new 
TopicSubscriber(Constants.INSTANCE_STATUS_TOPIC);
-            subscriber.setMessageListener(new InstanceStatusListener());
-            Thread tsubscriber = new Thread(subscriber);
+                       // Schedule complete tenant event synchronizer
+                       if (log.isDebugEnabled()) {
+                               log.debug("Scheduling tenant synchronizer 
task...");
+                       }
+                       
TenantSynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance()
+                                                                               
       .getTaskService());
+
+                       // Register tenant event publisher
+                       if (log.isDebugEnabled()) {
+                               log.debug("Starting tenant event publisher...");
+                       }
+                       TenantEventPublisher tenantEventPublisher = new 
TenantEventPublisher();
+                       componentContext.getBundleContext()
+                                       
.registerService(org.apache.stratos.common.listeners.TenantMgtListener.class.getName(),
+                                                        tenantEventPublisher, 
null);
+
+                       // Start instance status topic subscriber
+                       if (log.isDebugEnabled()) {
+                               log.debug("Starting instance status topic 
subscriber...");
+                       }
+                       TopicSubscriber subscriber = new 
TopicSubscriber(Constants.INSTANCE_STATUS_TOPIC);
+                       subscriber.setMessageListener(new 
InstanceStatusListener());
+                       Thread tsubscriber = new Thread(subscriber);
                        tsubscriber.start();
 
-            //Create a Tenant-User Role at server start-up
-            UserRoleCreator.CreateTenantUserRole();
+                       // initializing the topology event subscriber
+                       /*
+                        * TopicSubscriber topologyTopicSubscriber = new
+                        * TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+                        * topologyTopicSubscriber.setMessageListener(new
+                        * TopologyEventListner());
+                        * Thread topologyTopicSubscriberThread = new
+                        * Thread(topologyTopicSubscriber);
+                        * topologyTopicSubscriberThread.start();
+                        * 
+                        * //Starting Topology Receiver
+                        * TopologyReceiver topologyReceiver = new 
TopologyReceiver();
+                        * Thread topologyReceiverThread = new 
Thread(topologyReceiver);
+                        * topologyReceiverThread.start();
+                        */
+
+                       stratosManagerTopologyEventReceiver = new 
StratosManagerTopologyEventReceiver();
+                       Thread topologyReceiverThread = new 
Thread(stratosManagerTopologyEventReceiver);
+                       topologyReceiverThread.start();
+                       log.info("Topology receiver thread started");
+
+                       // retrieve persisted CartridgeSubscriptions
+                       new 
DataInsertionAndRetrievalManager().cachePersistedSubscriptions();
+
+                       // Component activated successfully
+                       log.info("ADC management server component is 
activated");
+
+               } catch (Exception e) {
+                       if (log.isFatalEnabled()) {
+                               log.fatal("Could not activate ADC management 
server component", e);
+                       }
+               }
+       }
 
-            TenantUserRoleCreator tenantUserRoleCreator = new 
TenantUserRoleCreator();
-            componentContext.getBundleContext().registerService(
-                    
org.apache.stratos.common.listeners.TenantMgtListener.class.getName(),
-                    tenantUserRoleCreator, null);
+       protected void 
setConfigurationContextService(ConfigurationContextService contextService) {
+               
DataHolder.setClientConfigContext(contextService.getClientConfigContext());
+               
DataHolder.setServerConfigContext(contextService.getServerConfigContext());
 
-            //initializing the topology event subscriber
-            /*TopicSubscriber topologyTopicSubscriber = new 
TopicSubscriber(Constants.TOPOLOGY_TOPIC);
-            topologyTopicSubscriber.setMessageListener(new 
TopologyEventListner());
-            Thread topologyTopicSubscriberThread = new 
Thread(topologyTopicSubscriber);
-            topologyTopicSubscriberThread.start();
+       }
 
-            //Starting Topology Receiver
-            TopologyReceiver topologyReceiver = new TopologyReceiver();
-            Thread topologyReceiverThread = new Thread(topologyReceiver);
-            topologyReceiverThread.start();*/
+       protected void 
unsetConfigurationContextService(ConfigurationContextService contextService) {
+               DataHolder.setClientConfigContext(null);
+               DataHolder.setServerConfigContext(null);
+       }
 
-            stratosManagerTopologyEventReceiver = new 
StratosManagerTopologyEventReceiver();
-            Thread topologyReceiverThread = new 
Thread(stratosManagerTopologyEventReceiver);
-            topologyReceiverThread.start();
-            log.info("Topology receiver thread started");
+       protected void setRealmService(RealmService realmService) {
+               // keeping the realm service in the DataHolder class
+               DataHolder.setRealmService(realmService);
+       }
 
-            // retrieve persisted CartridgeSubscriptions
-            new 
DataInsertionAndRetrievalManager().cachePersistedSubscriptions();
+       protected void unsetRealmService(RealmService realmService) {
+       }
 
-            //Component activated successfully
-            log.info("ADC management server component is activated");
-                       
+       protected void setRegistryService(RegistryService registryService) {
+               try {
+                       DataHolder.setRegistryService(registryService);
                } catch (Exception e) {
-            if(log.isFatalEnabled()) {
-                           log.fatal("Could not activate ADC management server 
component", e);
-            }
+                       log.error("Cannot retrieve governance registry", e);
                }
        }
 
-    protected void setConfigurationContextService(ConfigurationContextService 
contextService) {
-        
DataHolder.setClientConfigContext(contextService.getClientConfigContext());
-        
DataHolder.setServerConfigContext(contextService.getServerConfigContext());
-
-    }
-
-    protected void 
unsetConfigurationContextService(ConfigurationContextService contextService) {
-        DataHolder.setClientConfigContext(null);
-        DataHolder.setServerConfigContext(null);
-    }
-
-    protected void setRealmService(RealmService realmService) {
-        // keeping the realm service in the DataHolder class
-        DataHolder.setRealmService(realmService);
-    }
-
-    protected void unsetRealmService(RealmService realmService) {
-    }
-
-    protected void setRegistryService(RegistryService registryService) {
-        try {
-            DataHolder.setRegistryService(registryService);
-        } catch (Exception e) {
-            log.error("Cannot retrieve governance registry", e);
-        }
-    }
-
-    protected void unsetRegistryService(RegistryService registryService) {
-    }
-
-    /*protected void setTopologyManagementService(TopologyManagementService 
topologyMgtService) {
-        DataHolder.setTopologyMgtService(topologyMgtService);
-    }
-
-    protected void unsetTopologyManagementService(TopologyManagementService 
topologyMgtService) {
-    }*/
-
-    protected void setTaskService(TaskService taskService) {
-        if (log.isDebugEnabled()) {
-            log.debug("Setting the task service");
-        }
-        ServiceReferenceHolder.getInstance().setTaskService(taskService);
-    }
-
-    protected void unsetTaskService(TaskService taskService) {
-        if (log.isDebugEnabled()) {
-            log.debug("Un-setting the task service");
-        }
-        ServiceReferenceHolder.getInstance().setTaskService(null);
-    }
-
-    protected void deactivate(ComponentContext context) {
-        // Close event publisher connections to message broker
-        EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC);
-        EventPublisherPool.close(Constants.TENANT_TOPIC);
-
-        //terminate Stratos Manager Topology Receiver
-        stratosManagerTopologyEventReceiver.terminate();
-    }
+       protected void unsetRegistryService(RegistryService registryService) {
+       }
+
+       /*
+        * protected void setTopologyManagementService(TopologyManagementService
+        * topologyMgtService) {
+        * DataHolder.setTopologyMgtService(topologyMgtService);
+        * }
+        * 
+        * protected void 
unsetTopologyManagementService(TopologyManagementService
+        * topologyMgtService) {
+        * }
+        */
+
+       protected void setTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Setting the task service");
+               }
+               
ServiceReferenceHolder.getInstance().setTaskService(taskService);
+       }
+
+       protected void unsetTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Un-setting the task service");
+               }
+               ServiceReferenceHolder.getInstance().setTaskService(null);
+       }
+
+       protected void deactivate(ComponentContext context) {
+               // Close event publisher connections to message broker
+               // EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC);
+               // EventPublisherPool.close(Constants.TENANT_TOPIC);
+
+               // terminate Stratos Manager Topology Receiver
+               stratosManagerTopologyEventReceiver.terminate();
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java
index 2b68de6..d730073 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java
@@ -1,23 +1,28 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
  * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.manager.listener;
 
+import java.util.Set;
+
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.manager.publisher.InstanceNotificationPublisher;
@@ -26,71 +31,108 @@ import 
org.apache.stratos.manager.subscription.CartridgeSubscription;
 import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
 import org.apache.stratos.messaging.util.Constants;
 import org.apache.stratos.messaging.util.Util;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
 
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-import java.util.Set;
+public class InstanceStatusListener implements MqttCallback {
+
+       private static final Log log = 
LogFactory.getLog(InstanceStatusListener.class);
+
+       @Override
+       public void connectionLost(Throwable arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void deliveryComplete(IMqttDeliveryToken arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void messageArrived(String arg0, MqttMessage message) throws 
Exception {
+               if (message instanceof MqttMessage) {
+
+                       TextMessage receivedMessage = new ActiveMQTextMessage();
+                       System.out.println("instance notifier messege 
received....");
+                       receivedMessage.setText(new 
String(message.getPayload()));
+                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                                                         
"org.apache.stratos.messaging.event.".concat(arg0.replace("/",
+                                                                               
                                    ".")));
+
+                       if (log.isInfoEnabled()) {
+                               log.info("Instance status message received");
+                       }
+
+                       try {
+                               String type = 
receivedMessage.getStringProperty(Constants.EVENT_CLASS_NAME);
+                               if (log.isInfoEnabled()) {
+                                       log.info(String.format("Event class 
name: %s ", type));
+                               }
+                               // If member started event is received publish 
artifact update
+                               // message
+                               // To do a git clone
+                               if 
(InstanceStartedEvent.class.getName().equals(type)) {
+                                       String json = receivedMessage.getText();
+                                       InstanceStartedEvent event =
+                                                                    
(InstanceStartedEvent) Util.jsonToObject(json,
+                                                                               
                              InstanceStartedEvent.class);
+                                       String clusterId = event.getClusterId();
+                                       if (log.isInfoEnabled()) {
+                                               log.info("Cluster id: " + 
clusterId);
+                                       }
+
+                                       Set<CartridgeSubscription> 
cartridgeSubscriptions =
+                                                                               
            new 
DataInsertionAndRetrievalManager().getCartridgeSubscriptionForCluster(clusterId);
+                                       if (cartridgeSubscriptions == null || 
cartridgeSubscriptions.isEmpty()) {
+                                               // No subscriptions, return
+                                               if (log.isDebugEnabled()) {
+                                                       log.debug("No 
subscription information found for cluster id " +
+                                                                 clusterId);
+                                               }
+                                               return;
+                                       }
+
+                                       for (CartridgeSubscription 
cartridgeSubscription : cartridgeSubscriptions) {
+                                               // We need to send this event 
for all types, single
+                                               // tenant
+                                               // and multi tenant.
+                                               // In an autoscaling scenario, 
we need to send this
+                                               // event
+                                               // for all existing 
subscriptions for the newly spawned
+                                               // instance
+                                               // Also in a case of restarting 
the agent, this event
+                                               // needs
+                                               // to be sent for all 
subscriptions for the existing
+                                               // instance
+                                               if 
(cartridgeSubscription.getRepository() != null) {
+                                                       
InstanceNotificationPublisher publisher =
+                                                                               
                  new InstanceNotificationPublisher();
+                                                       
publisher.sendArtifactUpdateEvent(cartridgeSubscription.getRepository(),
+                                                                               
          clusterId,
+                                                                               
          String.valueOf(cartridgeSubscription.getSubscriber()
+                                                                               
                                              .getTenantId()));
+
+                                               } else {
+                                                       if 
(log.isDebugEnabled()) {
+                                                               log.debug("No 
repository found for subscription with alias: " +
+                                                                         
cartridgeSubscription.getAlias() + ", type: " +
+                                                                         
cartridgeSubscription.getType() +
+                                                                         ". 
Not sending the Artifact Updated event");
+                                                       }
+                                               }
+                                       }
+
+                               }
+                       } catch (Exception e) {
+                               if (log.isErrorEnabled()) {
+                                       log.error("Could not process instance 
status message", e);
+                               }
+                       }
+               }
 
-public class InstanceStatusListener implements MessageListener {
-
-    private static final Log log = LogFactory
-            .getLog(InstanceStatusListener.class);
-
-    @Override
-    public void onMessage(Message message) {
-        TextMessage receivedMessage = (TextMessage) message;
-        if(log.isInfoEnabled()) {
-            log.info("Instance status message received");
-        }
-
-        try {
-            String type = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
-            if(log.isInfoEnabled()) {
-                log.info(String.format("Event class name: %s ", type));
-            }
-            // If member started event is received publish artifact update 
message
-            // To do a git clone
-            if (InstanceStartedEvent.class.getName().equals(type)) {
-                String json = receivedMessage.getText();
-                InstanceStartedEvent event = (InstanceStartedEvent) 
Util.jsonToObject(json, InstanceStartedEvent.class);
-                String clusterId = event.getClusterId();
-                if(log.isInfoEnabled()) {
-                    log.info("Cluster id: " + clusterId);
-                }
-
-                Set<CartridgeSubscription> cartridgeSubscriptions = new 
DataInsertionAndRetrievalManager().getCartridgeSubscriptionForCluster(clusterId);
-                if (cartridgeSubscriptions == null || 
cartridgeSubscriptions.isEmpty()) {
-                    // No subscriptions, return
-                    if (log.isDebugEnabled()) {
-                        log.debug("No subscription information found for 
cluster id " + clusterId);
-                    }
-                    return;
-                }
-
-                for (CartridgeSubscription cartridgeSubscription : 
cartridgeSubscriptions) {
-                    // We need to send this event for all types, single tenant 
and multi tenant.
-                    // In an autoscaling scenario, we need to send this event 
for all existing subscriptions for the newly spawned instance
-                    // Also in a case of restarting the agent, this event 
needs to be sent for all subscriptions for the existing instance
-                    if (cartridgeSubscription.getRepository() != null) {
-                        InstanceNotificationPublisher publisher = new 
InstanceNotificationPublisher();
-                        
publisher.sendArtifactUpdateEvent(cartridgeSubscription.getRepository(), 
clusterId,
-                                
String.valueOf(cartridgeSubscription.getSubscriber().getTenantId()));
-
-                    } else {
-                        if(log.isDebugEnabled()) {
-                            log.debug("No repository found for subscription 
with alias: " + cartridgeSubscription.getAlias() + ", type: " + 
cartridgeSubscription.getType()+
-                                    ". Not sending the Artifact Updated 
event");
-                        }
-                    }
-                }
-
-            }
-        } catch (Exception e) {
-            if(log.isErrorEnabled()) {
-                log.error("Could not process instance status message", e);
-            }
-        }
-    }
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
index 6a885e8..c932e98 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
@@ -1,18 +1,18 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
  * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -27,56 +27,59 @@ import org.apache.stratos.messaging.event.Event;
 import 
org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
 import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
 import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
-import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
 
 /**
- * Creating the relevant instance notification event and publish it to the 
instances.
+ * Creating the relevant instance notification event and publish it to the
+ * instances.
  */
 public class InstanceNotificationPublisher {
-    private static final Log log = 
LogFactory.getLog(InstanceNotificationPublisher.class);
+       private static final Log log = 
LogFactory.getLog(InstanceNotificationPublisher.class);
 
-    public InstanceNotificationPublisher() {
-    }
+       public InstanceNotificationPublisher() {
+       }
 
-    private void publish(Event event) {
-        EventPublisher depsyncEventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC);
-        depsyncEventPublisher.publish(event);
-    }
+       private void publish(Event event) {
+               String topic = Util.getMessageTopicName(event);
+               EventPublisher depsyncEventPublisher = 
EventPublisherPool.getPublisher(topic);
+               depsyncEventPublisher.publish(event);
+       }
 
-    /**
-     * Publishing the artifact update event to the instances
-     *
-     * @param repository
-     * @param clusterId
-     * @param tenantId
-     */
-    public void sendArtifactUpdateEvent(Repository repository, String 
clusterId, String tenantId) {
-        ArtifactUpdatedEvent artifactUpdateEvent = new ArtifactUpdatedEvent();
-        artifactUpdateEvent.setClusterId(clusterId);
-        artifactUpdateEvent.setRepoUserName(repository.getUserName());
-        artifactUpdateEvent.setRepoPassword(repository.getPassword());
-        artifactUpdateEvent.setRepoURL(repository.getUrl());
-        artifactUpdateEvent.setTenantId(tenantId);
-        artifactUpdateEvent.setCommitEnabled(repository.isCommitEnabled());
+       /**
+        * Publishing the artifact update event to the instances
+        * 
+        * @param repository
+        * @param clusterId
+        * @param tenantId
+        */
+       public void sendArtifactUpdateEvent(Repository repository, String 
clusterId, String tenantId) {
+               ArtifactUpdatedEvent artifactUpdateEvent = new 
ArtifactUpdatedEvent();
+               artifactUpdateEvent.setClusterId(clusterId);
+               artifactUpdateEvent.setRepoUserName(repository.getUserName());
+               artifactUpdateEvent.setRepoPassword(repository.getPassword());
+               artifactUpdateEvent.setRepoURL(repository.getUrl());
+               artifactUpdateEvent.setTenantId(tenantId);
+               
artifactUpdateEvent.setCommitEnabled(repository.isCommitEnabled());
 
-        log.info(String.format("Publishing artifact updated event: [cluster] 
%s " +
-                "[repo-URL] %s [repo-username] %s [tenant-id] %s",
-                clusterId, repository.getUrl(), repository.getUserName(), 
tenantId));
-        publish(artifactUpdateEvent);
-    }
+               log.info(String.format("Publishing artifact updated event: 
[cluster] %s "
+                                              + "[repo-URL] %s [repo-username] 
%s [tenant-id] %s",
+                                      clusterId,
+                                      repository.getUrl(), 
repository.getUserName(), tenantId));
+               publish(artifactUpdateEvent);
+       }
 
-    /**
-     * Publishing the instance termination notification to the instances
-     *
-     * @param memberId
-     */
-    public void sendInstanceCleanupEventForMember(String memberId) {
-        log.info(String.format("Publishing Instance Cleanup Event: [member] 
%s", memberId));
-        publish(new InstanceCleanupMemberEvent(memberId));
-    }
+       /**
+        * Publishing the instance termination notification to the instances
+        * 
+        * @param memberId
+        */
+       public void sendInstanceCleanupEventForMember(String memberId) {
+               log.info(String.format("Publishing Instance Cleanup Event: 
[member] %s", memberId));
+               publish(new InstanceCleanupMemberEvent(memberId));
+       }
 
-    public void sendInstanceCleanupEventForCluster(String clusterId) {
-         log.info(String.format("Publishing Instance Cleanup Event: [cluster] 
%s", clusterId));
-        publish(new InstanceCleanupClusterEvent(clusterId));
-    }
+       public void sendInstanceCleanupEventForCluster(String clusterId) {
+               log.info(String.format("Publishing Instance Cleanup Event: 
[cluster] %s", clusterId));
+               publish(new InstanceCleanupClusterEvent(clusterId));
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
index 8213ed9..0f7816e 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
@@ -1,18 +1,18 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -30,7 +30,7 @@ import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantUpdatedEvent;
-import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
 
 /**
  * Tenant event publisher to publish tenant events to the message broker by
@@ -38,78 +38,83 @@ import org.apache.stratos.messaging.util.Constants;
  */
 public class TenantEventPublisher implements TenantMgtListener {
 
-    private static final Log log = 
LogFactory.getLog(TenantEventPublisher.class);
-    private static final int EXEC_ORDER = 1;
+       private static final Log log = 
LogFactory.getLog(TenantEventPublisher.class);
+       private static final int EXEC_ORDER = 1;
 
+       @Override
+       public void onTenantCreate(TenantInfoBean tenantInfo) throws 
StratosException {
+               try {
+                       if (log.isDebugEnabled()) {
+                               log.debug(String.format("Publishing tenant 
created event: [tenant-id] %d [tenant-domain] %s",
+                                                       
tenantInfo.getTenantId(), tenantInfo.getTenantDomain()));
+                       }
+                       Tenant tenant = new Tenant(tenantInfo.getTenantId(), 
tenantInfo.getTenantDomain());
+                       TenantCreatedEvent event = new 
TenantCreatedEvent(tenant);
+                       String topic = Util.getMessageTopicName(event);
+                       EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
+                       eventPublisher.publish(event);
+               } catch (Exception e) {
+                       log.error("Could not publish tenant created event", e);
+               }
+       }
 
-    @Override
-        public void onTenantCreate(TenantInfoBean tenantInfo) throws 
StratosException {
-            try {
-                if(log.isDebugEnabled()) {
-                    log.debug(String.format("Publishing tenant created event: 
[tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), 
tenantInfo.getTenantDomain()));
-                }
-                Tenant tenant = new Tenant(tenantInfo.getTenantId(), 
tenantInfo.getTenantDomain());
-                TenantCreatedEvent event = new TenantCreatedEvent(tenant);
-                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
-                eventPublisher.publish(event);
-            }
-            catch (Exception e) {
-                log.error("Could not publish tenant created event", e);
-            }
-        }
+       @Override
+       public void onTenantUpdate(TenantInfoBean tenantInfo) throws 
StratosException {
+               try {
+                       if (log.isInfoEnabled()) {
+                               log.info(String.format("Publishing tenant 
updated event: [tenant-id] %d [tenant-domain] %s",
+                                                      
tenantInfo.getTenantId(), tenantInfo.getTenantDomain()));
+                       }
+                       TenantUpdatedEvent event =
+                                                  new 
TenantUpdatedEvent(tenantInfo.getTenantId(),
+                                                                         
tenantInfo.getTenantDomain());
+                       String topic = Util.getMessageTopicName(event);
+                       EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
+                       eventPublisher.publish(event);
+               } catch (Exception e) {
+                       log.error("Could not publish tenant updated event");
+               }
+       }
 
-        @Override
-        public void onTenantUpdate(TenantInfoBean tenantInfo) throws 
StratosException {
-            try {
-                if(log.isInfoEnabled()) {
-                    log.info(String.format("Publishing tenant updated event: 
[tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), 
tenantInfo.getTenantDomain()));
-                }
-                TenantUpdatedEvent event = new 
TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
-                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
-                eventPublisher.publish(event);
-            }
-            catch (Exception e) {
-                log.error("Could not publish tenant updated event");
-            }
-        }
+       @Override
+       public void onTenantDelete(int tenantId) {
+               try {
+                       if (log.isInfoEnabled()) {
+                               log.info(String.format("Publishing tenant 
removed event: [tenant-id] %d", tenantId));
+                       }
+                       TenantRemovedEvent event = new 
TenantRemovedEvent(tenantId);
+                       String topic = Util.getMessageTopicName(event);
+                       EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
+                       eventPublisher.publish(event);
+               } catch (Exception e) {
+                       log.error("Could not publish tenant removed event");
+               }
+       }
 
-        @Override
-        public void onTenantDelete(int tenantId) {
-            try {
-                if(log.isInfoEnabled()) {
-                    log.info(String.format("Publishing tenant removed event: 
[tenant-id] %d", tenantId));
-                }
-                TenantRemovedEvent event = new TenantRemovedEvent(tenantId);
-                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
-                eventPublisher.publish(event);
-            }
-            catch (Exception e) {
-                log.error("Could not publish tenant removed event");
-            }
-        }
+       @Override
+       public void onTenantRename(int tenantId, String oldDomainName, String 
newDomainName)
+                                                                               
            throws StratosException {
+       }
 
-        @Override
-        public void onTenantRename(int tenantId, String oldDomainName, String 
newDomainName) throws StratosException {
-        }
+       @Override
+       public void onTenantInitialActivation(int tenantId) throws 
StratosException {
+       }
 
-        @Override
-        public void onTenantInitialActivation(int tenantId) throws 
StratosException {
-        }
+       @Override
+       public void onTenantActivation(int tenantId) throws StratosException {
+       }
 
-        @Override
-        public void onTenantActivation(int tenantId) throws StratosException {
-        }
+       @Override
+       public void onTenantDeactivation(int tenantId) throws StratosException {
+       }
 
-        @Override
-        public void onTenantDeactivation(int tenantId) throws StratosException 
{
-        }
+       @Override
+       public void onSubscriptionPlanChange(int tenantId, String oldPlan, 
String newPlan)
+                                                                               
          throws StratosException {
+       }
 
-        @Override
-        public void onSubscriptionPlanChange(int tenantId, String oldPlan, 
String newPlan) throws StratosException {
-        }
-
-        @Override
-        public int getListenerOrder() {
-            return EXEC_ORDER;
-        }
+       @Override
+       public int getListenerOrder() {
+               return EXEC_ORDER;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
index 81ec432..78f587c 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
@@ -1,24 +1,30 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 package org.apache.stratos.manager.publisher;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.beans.TenantInfoBean;
@@ -31,88 +37,95 @@ import 
org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.tenant.Subscription;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
-import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
 import org.apache.stratos.tenant.mgt.util.TenantMgtUtil;
 import org.wso2.carbon.ntask.core.Task;
 import org.wso2.carbon.user.core.tenant.TenantManager;
 
-import java.util.*;
-
 /**
  * Tenant synchronizer task for publishing complete tenant event periodically
  * to message broker.
  */
 public class TenantSynzhronizerTask implements Task {
 
-    private static final Log log = 
LogFactory.getLog(TenantSynzhronizerTask.class);
+       private static final Log log = 
LogFactory.getLog(TenantSynzhronizerTask.class);
+
+       @Override
+       public void init() {
+       }
+
+       @Override
+       public void execute() {
+               try {
+                       if (log.isDebugEnabled()) {
+                               log.debug(String.format("Publishing complete 
tenant event"));
+                       }
+                       Tenant tenant;
+                       List<Tenant> tenants = new ArrayList<Tenant>();
+                       TenantManager tenantManager = 
DataHolder.getRealmService().getTenantManager();
+                       org.wso2.carbon.user.api.Tenant[] carbonTenants = 
tenantManager.getAllTenants();
+                       for (org.wso2.carbon.user.api.Tenant carbonTenant : 
carbonTenants) {
+                               // Create tenant
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format("Tenant found: 
[tenant-id] %d [tenant-domain] %s",
+                                                               
carbonTenant.getId(), carbonTenant.getDomain()));
+                               }
+                               tenant = new Tenant(carbonTenant.getId(), 
carbonTenant.getDomain());
 
-    @Override
-    public void init() {
-    }
+                               if 
(!org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
+                                                                               
                       .tenantExists(carbonTenant.getId())) {
+                                       // if the tenant is not already there 
in TenantManager,
+                                       // trigger TenantCreatedEvent
+                                       TenantInfoBean tenantBean = new 
TenantInfoBean();
+                                       
tenantBean.setTenantId(carbonTenant.getId());
+                                       
tenantBean.setTenantDomain(carbonTenant.getDomain());
+                                       
TenantMgtUtil.triggerAddTenant(tenantBean);
+                                       // add tenant to Tenant Manager
+                                       
org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
+                                                                               
                          .addTenant(tenant);
+                               }
 
-    @Override
-    public void execute() {
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing complete tenant event"));
-            }
-            Tenant tenant;
-            List<Tenant> tenants = new ArrayList<Tenant>();
-            TenantManager tenantManager = 
DataHolder.getRealmService().getTenantManager();
-            org.wso2.carbon.user.api.Tenant[] carbonTenants = 
tenantManager.getAllTenants();
-            for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) 
{
-                // Create tenant
-                if(log.isDebugEnabled()) {
-                    log.debug(String.format("Tenant found: [tenant-id] %d 
[tenant-domain] %s", carbonTenant.getId(), carbonTenant.getDomain()));
-                }
-                tenant = new Tenant(carbonTenant.getId(), 
carbonTenant.getDomain());
-               
-                if 
(!org.apache.stratos.messaging.message.receiver.tenant.TenantManager
-                        .getInstance().tenantExists(carbonTenant.getId())) {
-                    // if the tenant is not already there in TenantManager,
-                    // trigger TenantCreatedEvent
-                    TenantInfoBean tenantBean = new TenantInfoBean();
-                    tenantBean.setTenantId(carbonTenant.getId());
-                    tenantBean.setTenantDomain(carbonTenant.getDomain());
-                    TenantMgtUtil.triggerAddTenant(tenantBean);
-                    // add tenant to Tenant Manager
-                    
org.apache.stratos.messaging.message.receiver.tenant.TenantManager
-                    .getInstance().addTenant(tenant);
-                }
-               
-                // Add subscriptions
-                
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-                //List<CartridgeSubscriptionInfo> cartridgeSubscriptions = 
PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId());
-                
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-                Collection<CartridgeSubscription> cartridgeSubscriptions = new 
DataInsertionAndRetrievalManager().getCartridgeSubscriptions(tenant.getTenantId());
-                if (cartridgeSubscriptions != null && 
!cartridgeSubscriptions.isEmpty()) {
-                    for (CartridgeSubscription cartridgeSubscription : 
cartridgeSubscriptions) {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Tenant subscription 
found: [tenant-id] %d [tenant-domain] %s [service] %s",
-                                    carbonTenant.getId(), 
carbonTenant.getDomain(), cartridgeSubscription.getType()));
-                        }
-                        HashSet<String> clusterIds = new HashSet<String>();
-                        
clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
-                        Subscription subscription = new 
Subscription(cartridgeSubscription.getType(), clusterIds);
-                        for(SubscriptionDomain subscriptionDomain : 
cartridgeSubscription.getSubscriptionDomains()) {
-                            
subscription.addSubscriptionDomain(subscriptionDomain.getDomainName(), 
subscriptionDomain.getApplicationContext());
-                        }
-                        tenant.addSubscription(subscription);
-                    }
-                }
-                tenants.add(tenant);
-            }
-            CompleteTenantEvent event = new CompleteTenantEvent(tenants);
-            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
-            eventPublisher.publish(event);
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not publish complete tenant event", e);
-            }
-        }
-    }
+                               // Add subscriptions
+                               // 
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+                               // List<CartridgeSubscriptionInfo> 
cartridgeSubscriptions =
+                               // 
PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId());
+                               // 
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+                               Collection<CartridgeSubscription> 
cartridgeSubscriptions =
+                                                                               
           new 
DataInsertionAndRetrievalManager().getCartridgeSubscriptions(tenant.getTenantId());
+                               if (cartridgeSubscriptions != null && 
!cartridgeSubscriptions.isEmpty()) {
+                                       for (CartridgeSubscription 
cartridgeSubscription : cartridgeSubscriptions) {
+                                               if (log.isDebugEnabled()) {
+                                                       
log.debug(String.format("Tenant subscription found: [tenant-id] %d 
[tenant-domain] %s [service] %s",
+                                                                               
carbonTenant.getId(), carbonTenant.getDomain(),
+                                                                               
cartridgeSubscription.getType()));
+                                               }
+                                               HashSet<String> clusterIds = 
new HashSet<String>();
+                                               
clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
+                                               Subscription subscription =
+                                                                           new 
Subscription(
+                                                                               
             cartridgeSubscription.getType(),
+                                                                               
             clusterIds);
+                                               for (SubscriptionDomain 
subscriptionDomain : cartridgeSubscription.getSubscriptionDomains()) {
+                                                       
subscription.addSubscriptionDomain(subscriptionDomain.getDomainName(),
+                                                                               
           subscriptionDomain.getApplicationContext());
+                                               }
+                                               
tenant.addSubscription(subscription);
+                                       }
+                               }
+                               tenants.add(tenant);
+                       }
+                       CompleteTenantEvent event = new 
CompleteTenantEvent(tenants);
+                       String topic = Util.getMessageTopicName(event);
+                       EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
+                       eventPublisher.publish(event);
+               } catch (Exception e) {
+                       if (log.isErrorEnabled()) {
+                               log.error("Could not publish complete tenant 
event", e);
+                       }
+               }
+       }
 
-    @Override
-    public void setProperties(Map<String, String> stringStringMap) {
-    }
+       @Override
+       public void setProperties(Map<String, String> stringStringMap) {
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/pom.xml 
b/components/org.apache.stratos.messaging/pom.xml
index b2d851b..e7c79d9 100644
--- a/components/org.apache.stratos.messaging/pom.xml
+++ b/components/org.apache.stratos.messaging/pom.xml
@@ -73,7 +73,21 @@
                <artifactId>java-xmlbuilder</artifactId>
                <version>0.6</version>
                </dependency>
-        
+         <dependency>
+                       <groupId>org.eclipse.paho</groupId>
+                       <artifactId>mqtt-client</artifactId>
+                       <version>0.4.0</version>
+                </dependency>
+                    <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-core</artifactId>
+            <version>5.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <version>5.9.1</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
new file mode 100644
index 0000000..e4a74c8
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.broker.connect;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Util;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
+
+import java.io.File;
+import java.util.Properties;
+
+/**
+ * This class is responsible for loading the mqtt config file from the
+ * classpath
+ * Initialize the topic connection.
+ * and initialize the topic connection. Later if some other object needs a 
topic
+ * session, this object is capable of providing one.
+ */
+public class MQTTConnector {
+
+    public static final String MQTTURL = "defaultValue";
+    public static final String CLIENT_ID = "Startos";
+    public static final String TMPFILELOCATION = "/tmp";
+    private static MqttClient topicClient;
+
+    private static MqttClient topicClientSub;
+    private static final Log log = LogFactory.getLog(MQTTConnector.class);
+    private static String configFileLocation = 
System.getProperty("jndi.properties.dir");
+    private static Properties mqttProp =
+            Util.getProperties(configFileLocation + File.separator +
+                    "mqtttopic.properties");
+
+    public static synchronized MqttClient getMQTTConClient() {
+
+        if (topicClient == null) {
+
+
+            String broker = mqttProp.getProperty("mqtturl", MQTTURL);
+
+            String clientId = mqttProp.getProperty("clientID", CLIENT_ID);
+            MemoryPersistence persistence = new MemoryPersistence();
+
+            try {
+                topicClient = new MqttClient(broker, clientId, persistence);
+                MqttConnectOptions connOpts = new MqttConnectOptions();
+                connOpts.setCleanSession(true);
+                if (log.isDebugEnabled()) {
+                    log.debug("MQTT client connected");
+                }
+
+            } catch (MqttException me) {
+
+                log.error("Failed to initiate autoscaler service client. ", 
me);
+            }
+
+        }
+        return topicClient;
+
+    }
+
+    public static synchronized MqttClient getMQTTSubClient(String identifier) {
+        // if (topicClientSub == null) {
+
+        String broker = mqttProp.getProperty("mqtturl", MQTTURL);
+
+        String tempFile = mqttProp.getProperty("tempfilelocation", 
TMPFILELOCATION);
+        // Creating new default persistence for mqtt client
+        MqttDefaultFilePersistence persistence = new 
MqttDefaultFilePersistence(tempFile);
+
+        try {
+            MqttConnectOptions connOpts = new MqttConnectOptions();
+            connOpts.setCleanSession(true);
+            // mqtt client with specific url and a random client id
+            topicClientSub = new MqttClient(broker, identifier, persistence);
+
+            if (log.isDebugEnabled()) {
+                log.debug("MQTT client connected");
+            }
+
+        } catch (MqttException me) {
+
+            log.error("Failed to initiate autoscaler service client. ", me);
+        }
+
+        // }
+        return topicClientSub;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
deleted file mode 100644
index ebb339d..0000000
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.broker.connect;
-
-import java.util.Properties;
-import java.io.File;
-
-import javax.jms.JMSException;
-import javax.jms.QueueSession;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicSession;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.stratos.messaging.util.Util;
-import org.apache.stratos.messaging.util.Constants;
-
-/**
- * This class is responsible for loading the jndi.properties file from the
- * classpath
- * and initialize the topic connection. Later if some other object needs a 
topic
- * session, this object is capable of providing one.
- *
- * @author nirmal
- */
-public class TopicConnector {
-
-    private TopicConnection topicConnection;
-    private String jndiPropFileDir;
-    private Topic topic;
-
-    public TopicConnector() {
-        jndiPropFileDir = System.getProperty("jndi.properties.dir");
-    }
-
-    public void init(String topicName) throws Exception {
-        InitialContext ctx;
-        Properties environment = Util.getProperties(jndiPropFileDir + 
File.separator + "jndi.properties");
-        environment.put(Constants.REQUEST_BASE_CONTEXT, "true"); // always 
returns the base context.
-        ctx = new InitialContext(environment);
-        // Lookup connection factory
-        String connectionFactoryName = 
environment.get("connectionfactoryName").toString();
-        TopicConnectionFactory connFactory = (TopicConnectionFactory) 
ctx.lookup(connectionFactoryName);
-        // Lookup the topic
-        try {
-            setTopic((Topic) ctx.lookup(topicName));
-        } catch (NamingException e) {
-        }
-        topicConnection = connFactory.createTopicConnection();
-        topicConnection.start();
-    }
-
-    /**
-     * Provides a new topic session.
-     *
-     * @return topic session instance
-     * @throws JMSException if unable to create a topic session
-     */
-    public TopicSession newSession() throws Exception {
-        return topicConnection.createTopicSession(false, 
QueueSession.AUTO_ACKNOWLEDGE);
-    }
-
-    public void close() throws JMSException {
-        if (topicConnection == null) {
-            return;
-        }
-        topicConnection.close();
-    }
-
-    public Topic getTopic() {
-        return topic;
-    }
-
-    public void setTopic(Topic topic) {
-        this.topic = topic;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 7ea98d4..e6bce76 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -1,18 +1,18 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -20,14 +20,11 @@ package org.apache.stratos.messaging.broker.heartbeat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.connect.TopicConnector;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.ping.PingEvent;
 import org.apache.stratos.messaging.util.Constants;
 import org.apache.stratos.messaging.util.Util;
 
-import javax.jms.JMSException;
-
 /**
  * This health checker runs forever, and is responsible for checking the
  * connection
@@ -37,50 +34,52 @@ import javax.jms.JMSException;
 public class TopicHealthChecker implements Runnable {
 
        private static final Log log = 
LogFactory.getLog(TopicHealthChecker.class);
-       private String topicName;
-    private boolean terminated;
+       private final String topicName;
+       private boolean terminated;
 
-    public TopicHealthChecker(String name) {
+       public TopicHealthChecker(String name) {
                topicName = name;
        }
 
        @Override
        public void run() {
-        if(log.isDebugEnabled()){
-                   log.debug(topicName + " topic health checker is running... 
" );
-        }
-               TopicConnector testConnector = new TopicConnector();
+               if (log.isDebugEnabled()) {
+                       log.debug(topicName + " topic health checker is 
running... ");
+               }
+
                while (!terminated) {
                        try {
 
-                // Health checker needs to run with the smallest possible time 
interval (configurable)
-                // to detect a connection drop. Otherwise the subscriber will 
not
-                // get reconnected after a connection drop.
+                               // Health checker needs to run with the 
smallest possible time
+                               // interval (configurable)
+                               // to detect a connection drop. Otherwise the 
subscriber will
+                               // not
+                               // get reconnected after a connection drop.
                                Thread.sleep(Util.getAveragePingInterval());
-                               testConnector.init(topicName);
-                // A ping event is published to detect a session timeout
-                
EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), 
false);
+
+                               // A ping event is published to detect a 
session timeout
+                               
EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(),
+                                                                               
              false);
                        } catch (Exception e) {
                                // Implies connection is not established
                                // sleep for configured failover ping interval 
and retry
                                try {
-                    log.error(topicName + " topic health checker is failed and 
will try to subscribe again in "+Util.getFailoverPingInterval()/1000+" 
seconds.");
-                    Thread.sleep(Util.getFailoverPingInterval());
+                                       log.error(topicName +
+                                                 " topic health checker is 
failed and will try to subscribe again in " +
+                                                 
Util.getFailoverPingInterval() / 1000 + " seconds.");
+                                       
Thread.sleep(Util.getFailoverPingInterval());
                                        break;
                                } catch (InterruptedException ignore) {
                                }
                        } finally {
-                               try {
-                                       testConnector.close();
-                               } catch (JMSException ignore) {
-                               }
+
                        }
 
                }
 
        }
 
-    public void terminate() {
-        terminated = true;
-    }
+       public void terminate() {
+               terminated = true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 2150494..2ba4df4 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -1,18 +1,18 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -20,37 +20,35 @@
 package org.apache.stratos.messaging.broker.publish;
 
 import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.util.Constants;
-
-import java.util.Properties;
 
 /**
- *  Defines logic for publishing events to a given topic in the message broker.
- *  A message header will be used to send the event class name to be used by 
the
- *  subscriber to identify the event.
+ * Defines logic for publishing events to a given topic in the message broker.
+ * A message header will be used to send the event class name to be used by the
+ * subscriber to identify the event.
  */
 public class EventPublisher extends TopicPublisher {
 
-    /**
-     * @param topicName topic name of this publisher instance.
-     */
-    EventPublisher(String topicName) {
-        super(topicName);
-    }
+       /**
+        * @param topicName
+        *            topic name of this publisher instance.
+        */
+       EventPublisher(String topicName) {
+               super(topicName);
+       }
+
+       /**
+        * 
+        * @param event
+        *            event to be published
+        */
+       public void publish(Event event) {
+               publish(event, true);
+       }
 
-    /**
-     *
-     * @param event event to be published
-     */
-    public void publish(Event event) {
-        publish(event, true);
-    }
+       public void publish(Event event, boolean retry) {
+               synchronized (EventPublisher.class) {
 
-    public void publish(Event event, boolean retry) {
-        synchronized (EventPublisher.class) {
-            Properties headers = new Properties();
-            headers.put(Constants.EVENT_CLASS_NAME, 
event.getClass().getName());
-            super.publish(event, headers, retry);
-        }
-    }
+                       super.publish(event, retry);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index c0074f5..ff6ba8d 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -1,36 +1,31 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 package org.apache.stratos.messaging.broker.publish;
 
-import com.google.gson.Gson;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.connect.TopicConnector;
-import org.apache.stratos.messaging.publish.MessagePublisher;
+import org.apache.stratos.messaging.broker.connect.MQTTConnector;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
 
-import javax.jms.JMSException;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-import java.util.Enumeration;
-import java.util.Properties;
+import com.google.gson.Gson;
 
 /**
  * Any instance who needs to publish data to a topic, should communicate with
@@ -39,27 +34,28 @@ import java.util.Properties;
  * published
  * to JSON format, before publishing.
  * 
- * @author nirmal
+ * 
  * 
  */
-public class TopicPublisher extends MessagePublisher {
+public class TopicPublisher {
 
        private static final Log log = LogFactory.getLog(TopicPublisher.class);
-       private TopicSession topicSession;
-       private TopicConnector connector;
-       private javax.jms.TopicPublisher topicPublisher = null;
-    private boolean initialized;
+
+       private static final int QOS = 2;
+
+       public static TopicPublisher topicPub;
+       private boolean initialized;
+       private final String topic;
 
        /**
         * @param aTopicName
         *            topic name of this publisher instance.
         */
        TopicPublisher(String aTopicName) {
-               super(aTopicName);
-               connector = new TopicConnector();
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Topic publisher connector created: 
[topic] %s", getName()));
-        }
+               this.topic = aTopicName;
+               if (log.isDebugEnabled()) {
+                       log.debug(String.format("Topic publisher connector 
created: [topic] %s", topic));
+               }
        }
 
        /**
@@ -67,139 +63,58 @@ public class TopicPublisher extends MessagePublisher {
         * lost, this will perform re-subscription periodically, until a 
connection
         * obtained.
         */
+
        public void publish(Object messageObj, boolean retry) {
-               publish(messageObj, null, retry);
-       }
-       
-       public void publish(Object messageObj, Properties headers, boolean 
retry) {
-        synchronized (TopicPublisher.class) {
-            Gson gson = new Gson();
-            String message = gson.toJson(messageObj);
-            boolean published = false;
-            while(!published) {
-
-                try {
-                    doPublish(message, headers);
-                    published = true;
-                } catch (Exception e) {
-                    initialized = false;
-                    if(log.isErrorEnabled()) {
-                        log.error("Error while publishing to the topic: " + 
getName(), e);
-                    }
-                    if(!retry) {
-                        if(log.isDebugEnabled()) {
-                            log.debug("Retry disabled for topic " + getName());
-                        }
-                        throw new RuntimeException(e);
-                    }
-
-                    if(log.isInfoEnabled()) {
-                        log.info("Will try to re-publish in 60 sec");
-                    }
-                    try {
-                        Thread.sleep(60000);
-                    } catch (InterruptedException ignore) {
-                    }
-                }
-            }
-        }
+
+               synchronized (TopicPublisher.class) {
+                       Gson gson = new Gson();
+                       String message = gson.toJson(messageObj);
+                       boolean published = false;
+                       while (!published) {
+
+                               try {
+                                       MqttClient mqttClient = 
MQTTConnector.getMQTTConClient();
+
+                                       MqttMessage mqttMSG = new 
MqttMessage(message.getBytes());
+
+                                       mqttMSG.setQos(QOS);
+
+                                       mqttClient.connect();
+                                       mqttClient.publish(topic, mqttMSG);
+                                       mqttClient.disconnect();
+                                       published = true;
+                               } catch (Exception e) {
+                                       initialized = false;
+                                       if (log.isErrorEnabled()) {
+                                               log.error("Error while 
publishing to the topic: " + topic, e);
+                                       }
+                                       if (!retry) {
+                                               if (log.isDebugEnabled()) {
+                                                       log.debug("Retry 
disabled for topic " + topic);
+                                               }
+                                               throw new RuntimeException(e);
+                                       }
+
+                                       if (log.isInfoEnabled()) {
+                                               log.info("Will try to 
re-publish in 60 sec");
+                                       }
+                                       try {
+                                               Thread.sleep(60000);
+                                       } catch (InterruptedException ignore) {
+                                       }
+                               }
+                       }
+               }
        }
 
        public void close() {
-        synchronized (TopicPublisher.class) {
-            // closes all sessions/connections
-            try {
-                if(topicPublisher != null) {
-                    topicPublisher.close();
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Topic publisher closed: 
[topic] %s", getName()));
-                    }
-                }
-                if(topicSession != null) {
-                    topicSession.close();
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Topic publisher session 
closed: [topic] %s", getName()));
-                    }
-                }
-                if(connector != null) {
-                    connector.close();
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Topic publisher connector 
closed: [topic] %s", getName()));
-                    }
-                }
-            } catch (JMSException ignore) {
-            }
-        }
-       }
+               synchronized (TopicPublisher.class) {
+                       // closes all sessions/connections
+                       try {
 
-       private void doPublish(String message, Properties headers) throws 
Exception, JMSException {
-        if(!initialized) {
-            // Initialize a topic connection to the message broker
-            connector.init(getName());
-            initialized = true;
-            if(log.isDebugEnabled()) {
-                log.debug(String.format("Topic publisher connector 
initialized: [topic] %s", getName()));
-            }
-        }
-
-        try {
-        // Create a new session
-        topicSession = createSession(connector);
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Topic publisher session created: [topic] 
%s", getName()));
-        }
-        // Create a publisher from session
-        topicPublisher = createPublisher(topicSession);
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Topic publisher created: [topic] %s", 
getName()));
-        }
-
-        // Create text message
-        TextMessage textMessage = topicSession.createTextMessage(message);
-               
-               if (headers != null) {
-            // Add header properties
-                       @SuppressWarnings("rawtypes")
-                       Enumeration e = headers.propertyNames();
-
-                       while (e.hasMoreElements()) {
-                               String key = (String) e.nextElement();
-                               textMessage.setStringProperty(key, 
headers.getProperty(key));
+                       } catch (Exception ignore) {
                        }
                }
-
-               topicPublisher.publish(textMessage);
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Message published: [topic] %s [header] %s 
[body] %s", getName(), (headers != null) ? headers.toString() : "null", 
message));
-        }
-        }
-        finally {
-            if(topicPublisher != null) {
-                topicPublisher.close();
-                if(log.isDebugEnabled()) {
-                    log.debug(String.format("Topic publisher closed: [topic] 
%s", getName()));
-                }
-            }
-            if(topicSession != null) {
-                topicSession.close();
-                if(log.isDebugEnabled()) {
-                    log.debug(String.format("Topic publisher session closed: 
[topic] %s", getName()));
-                }
-            }
-        }
-    }
-
-    private TopicSession createSession(TopicConnector topicConnector) throws 
Exception {
-        // Create a new session
-        return topicConnector.newSession();
-    }
-
-       private javax.jms.TopicPublisher createPublisher(TopicSession 
topicSession) throws Exception, JMSException {
-        Topic topic = connector.getTopic();
-               if (topic == null) {
-                       // if the topic doesn't exist, create it.
-                       topic = topicSession.createTopic(getName());
-               }
-               return topicSession.createPublisher(topic);
        }
+
 }

Reply via email to