messaging model changes and MQTT updates

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bc78b9dc
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bc78b9dc
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bc78b9dc

Branch: refs/heads/docker-integration
Commit: bc78b9dc897927e08a9732fba297df5b9695aed4
Parents: 76e425d
Author: gayan <[email protected]>
Authored: Mon Sep 22 14:28:21 2014 +0530
Committer: Nirmal Fernando <[email protected]>
Committed: Tue Sep 23 18:20:48 2014 +0530

----------------------------------------------------------------------
 .../publisher/CartridgeAgentEventPublisher.java | 348 +++++++-------
 .../internal/CloudControllerDSComponent.java    | 173 +++----
 .../InstanceStatusEventMessageListener.java     |  82 ++--
 .../topology/TopologyEventPublisher.java        | 384 +++++++++------
 .../util/CloudControllerConstants.java          | 476 ++++++++++---------
 components/org.apache.stratos.manager/pom.xml   |   6 +
 .../internal/ADCManagementServerComponent.java  | 250 +++++-----
 .../listener/InstanceStatusListener.java        | 186 +++++---
 .../InstanceNotificationPublisher.java          | 103 ++--
 .../manager/publisher/TenantEventPublisher.java | 151 +++---
 .../publisher/TenantSynzhronizerTask.java       | 169 ++++---
 components/org.apache.stratos.messaging/pom.xml |  16 +-
 .../messaging/broker/connect/MQTTConnector.java | 111 +++++
 .../broker/connect/TopicConnector.java          |  96 ----
 .../broker/heartbeat/TopicHealthChecker.java    |  63 ++-
 .../broker/publish/EventPublisher.java          |  64 ++-
 .../broker/publish/TopicPublisher.java          | 223 +++------
 .../broker/subscribe/TopicSubscriber.java       | 220 +++++----
 .../stat/HealthStatEventMessageListener.java    | 106 +++--
 .../health/stat/HealthStatEventReceiver.java    | 110 ++---
 .../InstanceNotifierEventMessageListener.java   | 108 +++--
 .../notifier/InstanceNotifierEventReceiver.java | 116 ++---
 .../tenant/TenantEventMessageListener.java      | 102 ++--
 .../topology/TopologyEventMessageListener.java  | 101 ++--
 .../stratos/messaging/util/Constants.java       | 101 ++--
 .../stratos/messaging/util/Properties.java      |  35 +-
 .../org/apache/stratos/messaging/util/Util.java | 238 +++++-----
 .../extension/FaultHandlingWindowProcessor.java | 394 +++++++--------
 .../pom.xml                                     |   7 +-
 pom.xml                                         |   5 +-
 products/stratos/conf/mqtttopic.properties      |  21 +
 products/stratos/modules/distribution/pom.xml   |   5 +
 .../modules/distribution/src/assembly/bin.xml   |   3 +-
 tools/.gitmodules                               |   3 +
 tools/cartridge_agent                           |   1 +
 .../cartridge-agent/agent.properties            |  47 ++
 .../cartridge-agent/agent.py                    | 272 +++++++++++
 .../cartridge-agent/agent.sh                    |   8 +
 .../cartridge-agent/agent1.py                   |  15 +
 .../cartridge-agent/extensionhandler.py         |  44 ++
 .../cartridge-agent/extensionhandler.pyc        | Bin 0 -> 1987 bytes
 .../cartridge-agent/readme.txt                  |   2 +
 .../cartridge-agent/script.sh                   |   0
 .../cartridge-agent/util.py                     |  22 +
 .../cartridge-agent/util.pyc                    | Bin 0 -> 762 bytes
 45 files changed, 2871 insertions(+), 2116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
index 1046482..e5e6a4e 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
@@ -1,25 +1,24 @@
 package org.apache.stratos.cartridge.agent.event.publisher;
+
 /*
- *
+ * 
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
-*/
-
+ */
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,159 +31,186 @@ import 
org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceMod
 import 
org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
 import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
 import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
 
 /**
  * Cartridge agent event publisher.
  */
 public class CartridgeAgentEventPublisher {
-    private static final Log log = 
LogFactory.getLog(CartridgeAgentEventPublisher.class);
-    private static boolean started;
-    private static boolean activated;
-    private static boolean readyToShutdown;
-    private static boolean maintenance;
-
-    public static void publishInstanceStartedEvent() {
-        if (!isStarted()) {
-            if (log.isInfoEnabled()) {
-                log.info("Publishing instance started event");
-            }
-            InstanceStartedEvent event = new InstanceStartedEvent(
-                    CartridgeAgentConfiguration.getInstance().getServiceName(),
-                    CartridgeAgentConfiguration.getInstance().getClusterId(),
-                    
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getMemberId());
-
-            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
-            eventPublisher.publish(event);
-            setStarted(true);
-            if (log.isInfoEnabled()) {
-                log.info("Instance started event published");
-            }
-
-        } else {
-            if (log.isWarnEnabled()) {
-                log.warn("Instance already started");
-            }
-        }
-    }
-
-    public static void publishInstanceActivatedEvent() {
-        if (!isActivated()) {
-            if (log.isInfoEnabled()) {
-                log.info("Publishing instance activated event");
-            }
-            InstanceActivatedEvent event = new InstanceActivatedEvent(
-                    CartridgeAgentConfiguration.getInstance().getServiceName(),
-                    CartridgeAgentConfiguration.getInstance().getClusterId(),
-                    
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getMemberId()
-            );
-
-            // Event publisher connection will
-            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
-            eventPublisher.publish(event);
-            if (log.isInfoEnabled()) {
-                log.info("Instance activated event published");
-            }
-
-            if (log.isInfoEnabled()) {
-                log.info("Starting health statistics notifier");
-            }
-            Thread thread = new Thread(new HealthStatisticsNotifier());
-            thread.start();
-            setActivated(true);
-            if (log.isInfoEnabled()) {
-                log.info("Health statistics notifier started");
-            }
-        } else {
-            if (log.isWarnEnabled()) {
-                log.warn("Instance already activated");
-            }
-        }
-    }
-
-    public static void publishInstanceReadyToShutdownEvent() {
-        if (!isReadyToShutdown()) {
-            if (log.isInfoEnabled()) {
-                log.info("Publishing instance activated event");
-            }
-            InstanceReadyToShutdownEvent event = new 
InstanceReadyToShutdownEvent(
-                    CartridgeAgentConfiguration.getInstance().getServiceName(),
-                    CartridgeAgentConfiguration.getInstance().getClusterId(),
-                    
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getMemberId());
-
-            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
-            eventPublisher.publish(event);
-            setReadyToShutdown(true);
-            if (log.isInfoEnabled()) {
-                log.info("Instance ReadyToShutDown event published");
-            }
-        } else {
-            if (log.isWarnEnabled()) {
-                log.warn("Instance already sent ReadyToShutDown event....");
-            }
-        }
-    }
-
-    public static void publishMaintenanceModeEvent() {
-        if (!isMaintenance()) {
-            if (log.isInfoEnabled()) {
-                log.info("Publishing instance maintenance mode event");
-            }
-            InstanceMaintenanceModeEvent event = new 
InstanceMaintenanceModeEvent(
-                    CartridgeAgentConfiguration.getInstance().getServiceName(),
-                    CartridgeAgentConfiguration.getInstance().getClusterId(),
-                    
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getPartitionId(),
-                    CartridgeAgentConfiguration.getInstance().getMemberId());
-
-            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
-            eventPublisher.publish(event);
-            setMaintenance(true);
-            if (log.isInfoEnabled()) {
-                log.info("Instance Maintenance mode event published");
-            }
-        } else {
-            if (log.isWarnEnabled()) {
-                log.warn("Instance already in a Maintenance mode....");
-            }
-        }
-    }
-
-    public static boolean isStarted() {
-        return started;
-    }
-
-    public static void setStarted(boolean started) {
-        CartridgeAgentEventPublisher.started = started;
-    }
-
-    public static boolean isActivated() {
-        return activated;
-    }
-
-    public static void setActivated(boolean activated) {
-        CartridgeAgentEventPublisher.activated = activated;
-    }
-
-    public static boolean isReadyToShutdown() {
-        return readyToShutdown;
-    }
-
-    public static void setReadyToShutdown(boolean readyToShutdown) {
-        CartridgeAgentEventPublisher.readyToShutdown = readyToShutdown;
-    }
-
-    public static boolean isMaintenance() {
-        return maintenance;
-    }
-
-    public static void setMaintenance(boolean maintenance) {
-        CartridgeAgentEventPublisher.maintenance = maintenance;
-    }
+       private static final Log log = 
LogFactory.getLog(CartridgeAgentEventPublisher.class);
+       private static boolean started;
+       private static boolean activated;
+       private static boolean readyToShutdown;
+       private static boolean maintenance;
+
+       public static void publishInstanceStartedEvent() {
+               if (!isStarted()) {
+                       if (log.isInfoEnabled()) {
+                               log.info("Publishing instance started event");
+                       }
+                       InstanceStartedEvent event =
+                                                    new InstanceStartedEvent(
+                                                                             
CartridgeAgentConfiguration.getInstance()
+                                                                               
                         .getServiceName(),
+                                                                             
CartridgeAgentConfiguration.getInstance()
+                                                                               
                         .getClusterId(),
+                                                                             
CartridgeAgentConfiguration.getInstance()
+                                                                               
                         .getNetworkPartitionId(),
+                                                                             
CartridgeAgentConfiguration.getInstance()
+                                                                               
                         .getPartitionId(),
+                                                                             
CartridgeAgentConfiguration.getInstance()
+                                                                               
                         .getMemberId());
+
+                       EventPublisher eventPublisher =
+                                                       
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
+                       eventPublisher.publish(event);
+                       setStarted(true);
+                       if (log.isInfoEnabled()) {
+                               log.info("Instance started event published");
+                       }
+
+               } else {
+                       if (log.isWarnEnabled()) {
+                               log.warn("Instance already started");
+                       }
+               }
+       }
+
+       public static void publishInstanceActivatedEvent() {
+               if (!isActivated()) {
+                       if (log.isInfoEnabled()) {
+                               log.info("Publishing instance activated event");
+                       }
+                       InstanceActivatedEvent event =
+                                                      new 
InstanceActivatedEvent(
+                                                                               
  CartridgeAgentConfiguration.getInstance()
+                                                                               
                             .getServiceName(),
+                                                                               
  CartridgeAgentConfiguration.getInstance()
+                                                                               
                             .getClusterId(),
+                                                                               
  CartridgeAgentConfiguration.getInstance()
+                                                                               
                             .getNetworkPartitionId(),
+                                                                               
  CartridgeAgentConfiguration.getInstance()
+                                                                               
                             .getPartitionId(),
+                                                                               
  CartridgeAgentConfiguration.getInstance()
+                                                                               
                             .getMemberId());
+
+                       // Event publisher connection will
+                       EventPublisher eventPublisher =
+                                                       
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
+                       eventPublisher.publish(event);
+                       if (log.isInfoEnabled()) {
+                               log.info("Instance activated event published");
+                       }
+
+                       if (log.isInfoEnabled()) {
+                               log.info("Starting health statistics notifier");
+                       }
+                       Thread thread = new Thread(new 
HealthStatisticsNotifier());
+                       thread.start();
+                       setActivated(true);
+                       if (log.isInfoEnabled()) {
+                               log.info("Health statistics notifier started");
+                       }
+               } else {
+                       if (log.isWarnEnabled()) {
+                               log.warn("Instance already activated");
+                       }
+               }
+       }
+
+       public static void publishInstanceReadyToShutdownEvent() {
+               if (!isReadyToShutdown()) {
+                       if (log.isInfoEnabled()) {
+                               log.info("Publishing instance activated event");
+                       }
+                       InstanceReadyToShutdownEvent event =
+                                                            new 
InstanceReadyToShutdownEvent(
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getServiceName(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getClusterId(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getNetworkPartitionId(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getPartitionId(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getMemberId());
+
+                       EventPublisher eventPublisher =
+                                                       
EventPublisherPool.getPublisher(Util.getMessageTopicName(event));
+                       eventPublisher.publish(event);
+                       setReadyToShutdown(true);
+                       if (log.isInfoEnabled()) {
+                               log.info("Instance ReadyToShutDown event 
published");
+                       }
+               } else {
+                       if (log.isWarnEnabled()) {
+                               log.warn("Instance already sent ReadyToShutDown 
event....");
+                       }
+               }
+       }
+
+       public static void publishMaintenanceModeEvent() {
+               if (!isMaintenance()) {
+                       if (log.isInfoEnabled()) {
+                               log.info("Publishing instance maintenance mode 
event");
+                       }
+                       InstanceMaintenanceModeEvent event =
+                                                            new 
InstanceMaintenanceModeEvent(
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getServiceName(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getClusterId(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getNetworkPartitionId(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getPartitionId(),
+                                                                               
              CartridgeAgentConfiguration.getInstance()
+                                                                               
                                         .getMemberId());
+
+                       EventPublisher eventPublisher =
+                                                       
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
+                       eventPublisher.publish(event);
+                       setMaintenance(true);
+                       if (log.isInfoEnabled()) {
+                               log.info("Instance Maintenance mode event 
published");
+                       }
+               } else {
+                       if (log.isWarnEnabled()) {
+                               log.warn("Instance already in a Maintenance 
mode....");
+                       }
+               }
+       }
+
+       public static boolean isStarted() {
+               return started;
+       }
+
+       public static void setStarted(boolean started) {
+               CartridgeAgentEventPublisher.started = started;
+       }
+
+       public static boolean isActivated() {
+               return activated;
+       }
+
+       public static void setActivated(boolean activated) {
+               CartridgeAgentEventPublisher.activated = activated;
+       }
+
+       public static boolean isReadyToShutdown() {
+               return readyToShutdown;
+       }
+
+       public static void setReadyToShutdown(boolean readyToShutdown) {
+               CartridgeAgentEventPublisher.readyToShutdown = readyToShutdown;
+       }
+
+       public static boolean isMaintenance() {
+               return maintenance;
+       }
+
+       public static void setMaintenance(boolean maintenance) {
+               CartridgeAgentEventPublisher.maintenance = maintenance;
+       }
 }
-

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index b1d6c6d..8ade514 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -1,25 +1,24 @@
 package org.apache.stratos.cloud.controller.internal;
+
 /*
- *
+ * 
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
-*/
-
+ */
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,9 +30,7 @@ import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusE
 import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.ComponentContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
@@ -46,7 +43,8 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  * Registering Cloud Controller Service.
  * 
  * @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
- * @scr.reference name="ntask.component" 
interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * @scr.reference name="ntask.component"
+ *                interface="org.wso2.carbon.ntask.core.service.TaskService"
  *                cardinality="1..1" policy="dynamic" bind="setTaskService"
  *                unbind="unsetTaskService"
  * @scr.reference name="registry.service"
@@ -58,92 +56,95 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  *                interface="org.wso2.carbon.utils.ConfigurationContextService"
  *                cardinality="1..1" policy="dynamic"
  *                bind="setConfigurationContextService"
- *                unbind="unsetConfigurationContextService"           
+ *                unbind="unsetConfigurationContextService"
  */
 public class CloudControllerDSComponent {
 
-    private static final Log log = 
LogFactory.getLog(CloudControllerDSComponent.class);
-
-    protected void activate(ComponentContext context) {
-        try {
-                       
-            // Start instance status event message listener
-            TopicSubscriber subscriber = new 
TopicSubscriber(CloudControllerConstants.INSTANCE_TOPIC);
-            subscriber.setMessageListener(new 
InstanceStatusEventMessageListener());
-            Thread tsubscriber = new Thread(subscriber);
-            tsubscriber.start();
-
-            // Start instance status message delegator
-            InstanceStatusEventMessageDelegator delegator = new 
InstanceStatusEventMessageDelegator();
-            Thread tdelegator = new Thread(delegator);
-            tdelegator.start();
-               
-               // Register cloud controller service
-            BundleContext bundleContext = context.getBundleContext();
-            
bundleContext.registerService(CloudControllerService.class.getName(), new 
CloudControllerServiceImpl(), null);
-
-            if(log.isInfoEnabled()) {
-                log.info("Scheduling tasks");
-            }
-            
-                       TopologySynchronizerTaskScheduler
-                                               
.schedule(ServiceReferenceHolder.getInstance()
-                                                               
.getTaskService());
-                       
-        } catch (Throwable e) {
-            log.error("******* Cloud Controller Service bundle is failed to 
activate ****", e);
-        }
-    }
-    
-    protected void setTaskService(TaskService taskService) {
-        if (log.isDebugEnabled()) {
-            log.debug("Setting the Task Service");
-        }
-        ServiceReferenceHolder.getInstance().setTaskService(taskService);
-    }
-
-    protected void unsetTaskService(TaskService taskService) {
-        if (log.isDebugEnabled()) {
-            log.debug("Unsetting the Task Service");
-        }
-        ServiceReferenceHolder.getInstance().setTaskService(null);
-    }
-    
+       private static final Log log = 
LogFactory.getLog(CloudControllerDSComponent.class);
+
+       protected void activate(ComponentContext context) {
+               try {
+
+                       // Start instance status event message listener
+                       TopicSubscriber subscriber =
+                                                    new TopicSubscriber(
+                                                                        
CloudControllerConstants.INSTANCE_TOPIC);
+                       subscriber.setMessageListener(new 
InstanceStatusEventMessageListener());
+                       Thread tsubscriber = new Thread(subscriber);
+                       tsubscriber.start();
+
+                       // Start instance status message delegator
+                       InstanceStatusEventMessageDelegator delegator =
+                                                                       new 
InstanceStatusEventMessageDelegator();
+                       Thread tdelegator = new Thread(delegator);
+                       tdelegator.start();
+
+                       // Register cloud controller service
+                       BundleContext bundleContext = 
context.getBundleContext();
+                       
bundleContext.registerService(CloudControllerService.class.getName(),
+                                                     new 
CloudControllerServiceImpl(), null);
+
+                       if (log.isInfoEnabled()) {
+                               log.info("Scheduling tasks");
+                       }
+
+                       
TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance()
+                                                                               
         .getTaskService());
+
+               } catch (Throwable e) {
+                       log.error("******* Cloud Controller Service bundle is 
failed to activate ****", e);
+               }
+       }
+
+       protected void setTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Setting the Task Service");
+               }
+               
ServiceReferenceHolder.getInstance().setTaskService(taskService);
+       }
+
+       protected void unsetTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Unsetting the Task Service");
+               }
+               ServiceReferenceHolder.getInstance().setTaskService(null);
+       }
+
        protected void setRegistryService(RegistryService registryService) {
                if (log.isDebugEnabled()) {
                        log.debug("Setting the Registry Service");
                }
-               
-               try {                   
+
+               try {
                        UserRegistry registry = 
registryService.getGovernanceSystemRegistry();
-               ServiceReferenceHolder.getInstance()
-                                                    .setRegistry(registry);
-        } catch (RegistryException e) {
-               String msg = "Failed when retrieving Governance System 
Registry.";
-               log.error(msg, e);
-               throw new CloudControllerException(msg, e);
-        } 
+                       
ServiceReferenceHolder.getInstance().setRegistry(registry);
+               } catch (RegistryException e) {
+                       String msg = "Failed when retrieving Governance System 
Registry.";
+                       log.error(msg, e);
+                       throw new CloudControllerException(msg, e);
+               }
        }
 
        protected void unsetRegistryService(RegistryService registryService) {
                if (log.isDebugEnabled()) {
-            log.debug("Unsetting the Registry Service");
-        }
-        ServiceReferenceHolder.getInstance().setRegistry(null);
+                       log.debug("Unsetting the Registry Service");
+               }
+               ServiceReferenceHolder.getInstance().setRegistry(null);
        }
-       
+
        protected void 
setConfigurationContextService(ConfigurationContextService cfgCtxService) {
-        ServiceReferenceHolder.getInstance().setAxisConfiguration(
-                cfgCtxService.getServerConfigContext().getAxisConfiguration());
-    }
-
-    protected void 
unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
-        ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
-    }
-       
+               ServiceReferenceHolder.getInstance()
+                                     
.setAxisConfiguration(cfgCtxService.getServerConfigContext()
+                                                                        
.getAxisConfiguration());
+       }
+
+       protected void 
unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
+               ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
+       }
+
        protected void deactivate(ComponentContext ctx) {
-        // Close event publisher connections to message broker
-        EventPublisherPool.close(Constants.TOPOLOGY_TOPIC);
+               // Close event publisher connections to message broker
+               // EventPublisherPool.close(Constants.TOPOLOGY_TOPIC);
        }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
index 8e5ef9a..9d2833e 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
@@ -1,47 +1,75 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.cloud.controller.topic.instance.status;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Constants;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
 /**
  * this is to handle the topology subscription
  */
-public class InstanceStatusEventMessageListener implements MessageListener{
-    private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageListener.class);
-
-    @Override
-    public void onMessage(Message message) {
-        TextMessage receivedMessage = (TextMessage) message;
-        InstanceStatusEventMessageQueue.getInstance().add(receivedMessage);
-        if(log.isDebugEnabled()) {
-            try {
-                log.debug(String.format("Instance status message added to 
queue: %s", receivedMessage.getText()));
-            } catch (JMSException e) {
-                log.error(e);
-            }
-        }
-    }
+public class InstanceStatusEventMessageListener implements MqttCallback {
+       private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageListener.class);
+
+       @Override
+       public void connectionLost(Throwable arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void deliveryComplete(IMqttDeliveryToken arg0) {
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void messageArrived(String arg0, MqttMessage message) throws 
Exception {
+               if (message instanceof MqttMessage) {
+
+                       TextMessage receivedMessage = new ActiveMQTextMessage();
+                       System.out.println("instance notifier messege 
received....");
+                       receivedMessage.setText(new 
String(message.getPayload()));
+                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                                                         
"org.apache.stratos.messaging.event.".concat(arg0.replace("/",
+                                                                               
                                    ".")));
+
+                       try {
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format("Instance 
notifier message received: %s",
+                                                               ((TextMessage) 
message).getText()));
+                               }
+                               // Add received message to the queue
+                               
InstanceStatusEventMessageQueue.getInstance().add(receivedMessage);
+
+                       } catch (JMSException e) {
+                               log.error(e.getMessage(), e);
+                       }
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index fe86198..74ced5c 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -1,22 +1,26 @@
 package org.apache.stratos.cloud.controller.topology;
- /*
+
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.pojo.Cartridge;
@@ -26,159 +30,229 @@ import 
org.apache.stratos.cloud.controller.pojo.PortMapping;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Port;
+import org.apache.stratos.messaging.domain.topology.ServiceType;
+import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
-import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.util.Constants;
-
-import java.util.List;
-import java.util.Properties;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
+import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.util.Util;
 
 /**
  * this is to send the relevant events from cloud controller to topology topic
  */
 public class TopologyEventPublisher {
-    private static final Log log = 
LogFactory.getLog(TopologyEventPublisher.class);
-
-
-    public static void sendServiceCreateEvent(List<Cartridge> cartridgeList) {
-        ServiceCreatedEvent serviceCreatedEvent;
-        for(Cartridge cartridge : cartridgeList) {
-            serviceCreatedEvent = new ServiceCreatedEvent(cartridge.getType(), 
(cartridge.isMultiTenant() ? ServiceType.MultiTenant : 
ServiceType.SingleTenant));
-
-            // Add ports to the event
-            Port port;
-            List<PortMapping> portMappings = cartridge.getPortMappings();
-            for(PortMapping portMapping : portMappings) {
-                port = new Port(portMapping.getProtocol(),
-                                Integer.parseInt(portMapping.getPort()),
-                                Integer.parseInt(portMapping.getProxyPort()));
-                serviceCreatedEvent.addPort(port);
-            }
-
-            if(log.isInfoEnabled()) {
-                log.info(String.format("Publishing service created event: 
[service] %s", cartridge.getType()));
-            }
-            publishEvent(serviceCreatedEvent);
-        }
-    }
-
-    public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) {
-        ServiceRemovedEvent serviceRemovedEvent;
-        for(Cartridge cartridge : cartridgeList) {
-            serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType());
-            if(log.isInfoEnabled()) {
-                log.info(String.format("Publishing service removed event: 
[service] %s", serviceRemovedEvent.getServiceName()));
-            }
-            publishEvent(serviceRemovedEvent);
-        }
-    }
-
-    public static void sendClusterCreatedEvent(String serviceName, String 
clusterId, Cluster cluster) {
-        ClusterCreatedEvent clusterCreatedEvent = new 
ClusterCreatedEvent(serviceName, clusterId, cluster);
-
-        if(log.isInfoEnabled()) {
-            log.info("Publishing cluster created event: " +cluster.toString());
-        }
-        publishEvent(clusterCreatedEvent);
-
-    }
-
-    public static void sendClusterRemovedEvent(ClusterContext ctxt, String 
deploymentPolicy) {
-
-        ClusterRemovedEvent clusterRemovedEvent = new 
ClusterRemovedEvent(ctxt.getCartridgeType(), ctxt.getClusterId(), 
deploymentPolicy, ctxt.isLbCluster());
-
-        if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing cluster removed event: 
[service] %s [cluster] %s", ctxt.getCartridgeType(), ctxt.getClusterId()));
-        }
-        publishEvent(clusterRemovedEvent);
-
-    }
-
-    public static void sendClusterMaintenanceModeEvent(ClusterContext ctxt) {
-
-        ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = new 
ClusterMaintenanceModeEvent(ctxt.getCartridgeType(), ctxt.getClusterId());
-        clusterMaintenanceModeEvent.setStatus(ClusterStatus.In_Maintenance);
-        if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing cluster maintenance mode event: 
[service] %s [cluster] %s",
-                    clusterMaintenanceModeEvent.getServiceName(), 
clusterMaintenanceModeEvent.getClusterId()));
-        }
-        publishEvent(clusterMaintenanceModeEvent);
-
-    }
-
-    public static void sendInstanceSpawnedEvent(String serviceName, String 
clusterId, String networkPartitionId, String partitionId, String memberId,
-               String lbClusterId, String publicIp, String privateIp, 
MemberContext context) {
-        InstanceSpawnedEvent instanceSpawnedEvent = new 
InstanceSpawnedEvent(serviceName, clusterId, networkPartitionId, partitionId, 
memberId);
-        instanceSpawnedEvent.setLbClusterId(lbClusterId);
-        instanceSpawnedEvent.setMemberIp(privateIp);
-        instanceSpawnedEvent.setMemberPublicIp(publicIp);
-        
instanceSpawnedEvent.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties()));
-        if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing instance spawned event: 
[service] %s [cluster] %s [network-partition] %s [partition] %s [member] %s 
[lb-cluster-id] %s",
-                    serviceName, clusterId, networkPartitionId, partitionId, 
memberId, lbClusterId));
-        }
-        publishEvent(instanceSpawnedEvent);
-    }
-
-    public static void sendMemberStartedEvent(InstanceStartedEvent 
instanceStartedEvent) {
-        MemberStartedEvent memberStartedEventTopology = new 
MemberStartedEvent(instanceStartedEvent.getServiceName(),
-                           instanceStartedEvent.getClusterId(), 
instanceStartedEvent.getNetworkPartitionId(), 
instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId());
-
-        if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing member started event: [service] 
%s [cluster] %s [network-partition] %s [partition] %s [member] %s",
-                    instanceStartedEvent.getServiceName(), 
instanceStartedEvent.getClusterId(), 
instanceStartedEvent.getNetworkPartitionId(), 
instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId()));
-        }
-        publishEvent(memberStartedEventTopology);
-    }
-
-     public static void sendMemberActivatedEvent(MemberActivatedEvent 
memberActivatedEvent) {
-         if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing member activated event: 
[service] %s [cluster] %s [network-partition] %s [partition] %s [member] %s",
-                    memberActivatedEvent.getServiceName(), 
memberActivatedEvent.getClusterId(), 
memberActivatedEvent.getNetworkPartitionId(), 
memberActivatedEvent.getPartitionId(), memberActivatedEvent.getMemberId()));
-         }
-         publishEvent(memberActivatedEvent);
-    }
-
-    public static void 
sendMemberReadyToShutdownEvent(MemberReadyToShutdownEvent 
memberReadyToShutdownEvent) {
-         if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing member Ready to shut down 
event: [service] %s [cluster] %s [network-partition] %s [partition] %s [member] 
%s",
-                    memberReadyToShutdownEvent.getServiceName(), 
memberReadyToShutdownEvent.getClusterId(), 
memberReadyToShutdownEvent.getNetworkPartitionId(), 
memberReadyToShutdownEvent.getPartitionId(), 
memberReadyToShutdownEvent.getMemberId()));
-         }
-         publishEvent(memberReadyToShutdownEvent);
-    }
-
-    public static void 
sendMemberMaintenanceModeEvent(MemberMaintenanceModeEvent 
memberMaintenanceModeEvent) {
-            if(log.isInfoEnabled()) {
-               log.info(String.format("Publishing Maintenance mode event: 
[service] %s [cluster] %s [network-partition] %s [partition] %s [member] %s",
-                       memberMaintenanceModeEvent.getServiceName(), 
memberMaintenanceModeEvent.getClusterId(), 
memberMaintenanceModeEvent.getNetworkPartitionId(), 
memberMaintenanceModeEvent.getPartitionId(), 
memberMaintenanceModeEvent.getMemberId()));
-            }
-            publishEvent(memberMaintenanceModeEvent);
-       }
-
-
-    public static void sendMemberTerminatedEvent(String serviceName, String 
clusterId, String networkPartitionId,
-                                                 String partitionId, String 
memberId, Properties properties) {
-        MemberTerminatedEvent memberTerminatedEvent = new 
MemberTerminatedEvent(serviceName, clusterId, networkPartitionId, partitionId, 
memberId);
-        memberTerminatedEvent.setProperties(properties);
-        if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing member terminated event: 
[service] %s [cluster] %s [network-partition] %s [partition] %s [member] %s", 
serviceName, clusterId, networkPartitionId, partitionId, memberId));
-        }
-        publishEvent(memberTerminatedEvent);
-    }
-
-    public static void sendCompleteTopologyEvent(Topology topology) {
-        CompleteTopologyEvent completeTopologyEvent = new 
CompleteTopologyEvent(topology);
-
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Publishing complete topology event"));
-        }
-        publishEvent(completeTopologyEvent);
-    }
-
-    public static void publishEvent(Event event) {
-        EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TOPOLOGY_TOPIC);
-        eventPublisher.publish(event);
-    }
-}
+       private static final Log log = LogFactory
+                       .getLog(TopologyEventPublisher.class);
+
+       public static void sendServiceCreateEvent(List<Cartridge> 
cartridgeList) {
+               ServiceCreatedEvent serviceCreatedEvent;
+               for (Cartridge cartridge : cartridgeList) {
+                       serviceCreatedEvent = new 
ServiceCreatedEvent(cartridge.getType(),
+                                       (cartridge.isMultiTenant() ? 
ServiceType.MultiTenant
+                                                       : 
ServiceType.SingleTenant));
+
+                       // Add ports to the event
+                       Port port;
+                       List<PortMapping> portMappings = 
cartridge.getPortMappings();
+                       for (PortMapping portMapping : portMappings) {
+                               port = new Port(portMapping.getProtocol(),
+                                               
Integer.parseInt(portMapping.getPort()),
+                                               
Integer.parseInt(portMapping.getProxyPort()));
+                               serviceCreatedEvent.addPort(port);
+                       }
+
+                       if (log.isInfoEnabled()) {
+                               log.info(String.format(
+                                               "Publishing service created 
event: [service] %s",
+                                               cartridge.getType()));
+                       }
+                       publishEvent(serviceCreatedEvent);
+               }
+       }
+
+       public static void sendServiceRemovedEvent(List<Cartridge> 
cartridgeList) {
+               ServiceRemovedEvent serviceRemovedEvent;
+               for (Cartridge cartridge : cartridgeList) {
+                       serviceRemovedEvent = new 
ServiceRemovedEvent(cartridge.getType());
+                       if (log.isInfoEnabled()) {
+                               log.info(String.format(
+                                               "Publishing service removed 
event: [service] %s",
+                                               
serviceRemovedEvent.getServiceName()));
+                       }
+                       publishEvent(serviceRemovedEvent);
+               }
+       }
+
+       public static void sendClusterCreatedEvent(String serviceName,
+                       String clusterId, Cluster cluster) {
+               ClusterCreatedEvent clusterCreatedEvent = new 
ClusterCreatedEvent(
+                               serviceName, clusterId, cluster);
+
+               if (log.isInfoEnabled()) {
+                       log.info("Publishing cluster created event: " + 
cluster.toString());
+               }
+               publishEvent(clusterCreatedEvent);
+
+       }
+
+       public static void sendClusterRemovedEvent(ClusterContext ctxt,
+                       String deploymentPolicy) {
+
+               ClusterRemovedEvent clusterRemovedEvent = new 
ClusterRemovedEvent(
+                               ctxt.getCartridgeType(), ctxt.getClusterId(), 
deploymentPolicy,
+                               ctxt.isLbCluster());
+
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing cluster removed 
event: [service] %s [cluster] %s",
+                                                       
ctxt.getCartridgeType(), ctxt.getClusterId()));
+               }
+               publishEvent(clusterRemovedEvent);
+
+       }
+
+       public static void sendClusterMaintenanceModeEvent(ClusterContext ctxt) 
{
+
+               ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = new 
ClusterMaintenanceModeEvent(
+                               ctxt.getCartridgeType(), ctxt.getClusterId());
+               
clusterMaintenanceModeEvent.setStatus(ClusterStatus.In_Maintenance);
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing cluster maintenance 
mode event: [service] %s [cluster] %s",
+                                                       
clusterMaintenanceModeEvent.getServiceName(),
+                                                       
clusterMaintenanceModeEvent.getClusterId()));
+               }
+               publishEvent(clusterMaintenanceModeEvent);
+
+       }
+
+       public static void sendInstanceSpawnedEvent(String serviceName,
+                       String clusterId, String networkPartitionId, String 
partitionId,
+                       String memberId, String lbClusterId, String publicIp,
+                       String privateIp, MemberContext context) {
+               InstanceSpawnedEvent instanceSpawnedEvent = new 
InstanceSpawnedEvent(
+                               serviceName, clusterId, networkPartitionId, 
partitionId,
+                               memberId);
+               instanceSpawnedEvent.setLbClusterId(lbClusterId);
+               instanceSpawnedEvent.setMemberIp(privateIp);
+               instanceSpawnedEvent.setMemberPublicIp(publicIp);
+               instanceSpawnedEvent.setProperties(CloudControllerUtil
+                               .toJavaUtilProperties(context.getProperties()));
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing instance spawned 
event: [service] %s [cluster] %s [network-partition] %s [partition] %s [member] 
%s [lb-cluster-id] %s",
+                                                       serviceName, clusterId, 
networkPartitionId,
+                                                       partitionId, memberId, 
lbClusterId));
+               }
+               publishEvent(instanceSpawnedEvent);
+       }
+
+       public static void sendMemberStartedEvent(
+                       InstanceStartedEvent instanceStartedEvent) {
+               MemberStartedEvent memberStartedEventTopology = new 
MemberStartedEvent(
+                               instanceStartedEvent.getServiceName(),
+                               instanceStartedEvent.getClusterId(),
+                               instanceStartedEvent.getNetworkPartitionId(),
+                               instanceStartedEvent.getPartitionId(),
+                               instanceStartedEvent.getMemberId());
+
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing member started 
event: [service] %s [cluster] %s [network-partition] %s [partition] %s [member] 
%s",
+                                                       
instanceStartedEvent.getServiceName(),
+                                                       
instanceStartedEvent.getClusterId(),
+                                                       
instanceStartedEvent.getNetworkPartitionId(),
+                                                       
instanceStartedEvent.getPartitionId(),
+                                                       
instanceStartedEvent.getMemberId()));
+               }
+               publishEvent(memberStartedEventTopology);
+       }
+
+       public static void sendMemberActivatedEvent(
+                       MemberActivatedEvent memberActivatedEvent) {
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing member activated 
event: [service] %s [cluster] %s [network-partition] %s [partition] %s [member] 
%s",
+                                                       
memberActivatedEvent.getServiceName(),
+                                                       
memberActivatedEvent.getClusterId(),
+                                                       
memberActivatedEvent.getNetworkPartitionId(),
+                                                       
memberActivatedEvent.getPartitionId(),
+                                                       
memberActivatedEvent.getMemberId()));
+               }
+               publishEvent(memberActivatedEvent);
+       }
+
+       public static void sendMemberReadyToShutdownEvent(
+                       MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing member Ready to 
shut down event: [service] %s [cluster] %s [network-partition] %s [partition] 
%s [member] %s",
+                                                       
memberReadyToShutdownEvent.getServiceName(),
+                                                       
memberReadyToShutdownEvent.getClusterId(),
+                                                       
memberReadyToShutdownEvent.getNetworkPartitionId(),
+                                                       
memberReadyToShutdownEvent.getPartitionId(),
+                                                       
memberReadyToShutdownEvent.getMemberId()));
+               }
+               publishEvent(memberReadyToShutdownEvent);
+       }
+
+       public static void sendMemberMaintenanceModeEvent(
+                       MemberMaintenanceModeEvent memberMaintenanceModeEvent) {
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing Maintenance mode 
event: [service] %s [cluster] %s [network-partition] %s [partition] %s [member] 
%s",
+                                                       
memberMaintenanceModeEvent.getServiceName(),
+                                                       
memberMaintenanceModeEvent.getClusterId(),
+                                                       
memberMaintenanceModeEvent.getNetworkPartitionId(),
+                                                       
memberMaintenanceModeEvent.getPartitionId(),
+                                                       
memberMaintenanceModeEvent.getMemberId()));
+               }
+               publishEvent(memberMaintenanceModeEvent);
+       }
+
+       public static void sendMemberTerminatedEvent(String serviceName,
+                       String clusterId, String networkPartitionId, String 
partitionId,
+                       String memberId, Properties properties) {
+               MemberTerminatedEvent memberTerminatedEvent = new 
MemberTerminatedEvent(
+                               serviceName, clusterId, networkPartitionId, 
partitionId,
+                               memberId);
+               memberTerminatedEvent.setProperties(properties);
+               if (log.isInfoEnabled()) {
+                       log.info(String
+                                       .format("Publishing member terminated 
event: [service] %s [cluster] %s [network-partition] %s [partition] %s [member] 
%s",
+                                                       serviceName, clusterId, 
networkPartitionId,
+                                                       partitionId, memberId));
+               }
+               publishEvent(memberTerminatedEvent);
+       }
+
+       public static void sendCompleteTopologyEvent(Topology topology) {
+               CompleteTopologyEvent completeTopologyEvent = new 
CompleteTopologyEvent(
+                               topology);
+
+               if (log.isDebugEnabled()) {
+                       log.debug(String.format("Publishing complete topology 
event"));
+               }
+               publishEvent(completeTopologyEvent);
+       }
+
+       public static void publishEvent(Event event) {
+               String topic = Util.getMessageTopicName(event);
+               EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
+               eventPublisher.publish(event);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index b96ea28..af6e190 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -1,262 +1,274 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
  * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.cloud.controller.util;
 
-import org.wso2.carbon.utils.CarbonUtils;
-
 import java.io.File;
 
+import org.wso2.carbon.utils.CarbonUtils;
+
 public final class CloudControllerConstants {
 
-    /**
-     * cloud-controller XML file's elements
-     */
-    public static final String CLOUD_CONTROLLER_ELEMENT = "cloudController";
-    public static final String SERIALIZATION_DIR_ELEMENT = "serializationDir";
-    public static final String IAAS_PROVIDERS_ELEMENT = "iaasProviders";
-    public static final String IAAS_PROVIDER_ELEMENT = "iaasProvider";
-    public static final String PARTITION_ELEMENT = "partition";
-    public static final String PARTITIONS_ELEMENT = "partitions";
-    public static final String REGION_ELEMENT = "region";
-    public static final String ZONE_ELEMENT = "zone";
-    public static final String DEPLOYMENT_ELEMENT = "deployment";
-    public static final String PORT_MAPPING_ELEMENT = "portMapping";
-    public static final String APP_TYPES_ELEMENT = "appTypes";
-    public static final String TYPE_ATTR = "type";
-    public static final String HOST_ATTR = "host";
-    public static final String BASE_DIR_ATTR = "baseDir";
-    public static final String PROVIDER_ATTR = "provider";
-    public static final String VERSION_ATTR = "version";
-    public static final String MULTI_TENANT_ATTR = "multiTenant";
-    public static final String PORT_ATTR = "port";
-    public static final String PROXY_PORT_ATTR = "proxyPort";
-    public static final String NAME_ATTR = "name";
-    public static final String APP_SPECIFIC_MAPPING_ATTR = 
"appSpecificMapping";
-    
-    public static final String CARTRIDGES_ELEMENT = "cartridges";
-    public static final String CARTRIDGE_ELEMENT = "cartridge";
-    
-    public static final String DISPLAY_NAME_ELEMENT = "displayName";
-    public static final String DESCRIPTION_ELEMENT = "description";
-    public static final String PROPERTY_ELEMENT = "property";
-    public static final String PROPERTY_NAME_ATTR= "name";
-    public static final String PROPERTY_VALUE_ATTR = "value";
-    public static final String IMAGE_ID_ELEMENT = "imageId";
-    public static final String SCALE_DOWN_ORDER_ELEMENT = "scaleDownOrder";
-    public static final String SCALE_UP_ORDER_ELEMENT = "scaleUpOrder";
-    public static final String CLASS_NAME_ELEMENT = "className";
-    public static final String MAX_INSTANCE_LIMIT_ELEMENT = "maxInstanceLimit";
-    public static final String PROVIDER_ELEMENT = "provider";
-    public static final String IDENTITY_ELEMENT = "identity";
-    public static final String TYPE_ELEMENT = "type";
-    public static final String SCOPE_ELEMENT = "scope";
-    public static final String ID_ELEMENT = "id";
-    public static final String CREDENTIAL_ELEMENT = "credential";
-    public static final String DEFAULT_SERVICE_ELEMENT = "default";
-    public static final String SERVICE_ELEMENT = "service";
-    public static final String SERVICES_ELEMENT = "services";
-    public static final String DIRECTORY_ELEMENT = "dir";
-    public static final String HTTP_ELEMENT = "http";
-    public static final String HTTPS_ELEMENT = "https";
-    public static final String APP_TYPE_ELEMENT = "appType";
-    public static final String SERVICE_DOMAIN_ATTR = "domain";
-    public static final String SERVICE_SUB_DOMAIN_ATTR = "subDomain";
-    public static final String SERVICE_TENANT_RANGE_ATTR = "tenantRange";
-    public static final String POLICY_NAME = "policyName";
-    public static final String PAYLOAD_ELEMENT = "payload";
-    public static final String DATA_PUBLISHER_ELEMENT = "dataPublisher";
-    public static final String TOPOLOGY_SYNC_ELEMENT = "topologySync";
-    public static final String ENABLE_ATTR = "enable";
-    public static final String BAM_SERVER_ELEMENT = "bamServer";
-    public static final String CRON_ELEMENT = "cron";
-    public static final String BAM_SERVER_ADMIN_USERNAME_ELEMENT = 
"adminUserName";
-    public static final String BAM_SERVER_ADMIN_PASSWORD_ELEMENT = 
"adminPassword";
-    public static final String CASSANDRA_INFO_ELEMENT = "cassandraInfo";
-    public static final String HOST_ELEMENT = "host";
-    public static final String CONNECTION_URL_ELEMENT = "connectionUrl";
-    public static final String HOST_PORT_ELEMENT = "port";
-    public static final String USER_NAME_ELEMENT = "userName";
-    public static final String PASSWORD_ELEMENT = "password";
-    public static final String CLOUD_CONTROLLER_EVENT_STREAM = 
"org.apache.stratos.cloud.controller";
-    public static final String CLOUD_CONTROLLER_COL_FAMILY = 
CLOUD_CONTROLLER_EVENT_STREAM.replaceAll("[/.]", "_");
-    
-    
-    /**
-     * column names
-     */
-    public static final String PAYLOAD_PREFIX = "payload_";
-    public static final String MEMBER_ID_COL = "memberId";
-    public static final String CARTRIDGE_TYPE_COL = "cartridgeType";
-    public static final String CLUSTER_ID_COL = "clusterId";
-    public static final String PARTITION_ID_COL = "partitionId";
-    public static final String NETWORK_ID_COL = "networkId";
-    public static final String ALIAS_COL = "alias";
-    public static final String TENANT_RANGE_COL = "tenantRange";
-    public static final String IS_MULTI_TENANT_COL = "isMultiTenant";
-    public static final String IAAS_COL = "iaas";
-    public static final String STATUS_COL = "status";
-    public static final String HOST_NAME_COL = "hostName";
-    public static final String HYPERVISOR_COL = "hypervisor";
-    public static final String RAM_COL = "ram";
-    public static final String IMAGE_ID_COL = "imageId";
-    public static final String LOGIN_PORT_COL = "loginPort";
-    public static final String OS_NAME_COL = "osName";
-    public static final String OS_VERSION_COL = "osVersion";
-    public static final String OS_ARCH_COL = "osArch";
-    public static final String OS_BIT_COL = "is64bitOS";
-    public static final String PRIV_IP_COL = "privateIPAddresses";
-    public static final String PUB_IP_COL = "publicIPAddresses";
-    public static final String ALLOCATE_IP_COL = "allocateIPAddresses";
+       /**
+        * cloud-controller XML file's elements
+        */
+       public static final String CLOUD_CONTROLLER_ELEMENT = "cloudController";
+       public static final String SERIALIZATION_DIR_ELEMENT = 
"serializationDir";
+       public static final String IAAS_PROVIDERS_ELEMENT = "iaasProviders";
+       public static final String IAAS_PROVIDER_ELEMENT = "iaasProvider";
+       public static final String PARTITION_ELEMENT = "partition";
+       public static final String PARTITIONS_ELEMENT = "partitions";
+       public static final String REGION_ELEMENT = "region";
+       public static final String ZONE_ELEMENT = "zone";
+       public static final String DEPLOYMENT_ELEMENT = "deployment";
+       public static final String PORT_MAPPING_ELEMENT = "portMapping";
+       public static final String APP_TYPES_ELEMENT = "appTypes";
+       public static final String TYPE_ATTR = "type";
+       public static final String HOST_ATTR = "host";
+       public static final String BASE_DIR_ATTR = "baseDir";
+       public static final String PROVIDER_ATTR = "provider";
+       public static final String VERSION_ATTR = "version";
+       public static final String MULTI_TENANT_ATTR = "multiTenant";
+       public static final String PORT_ATTR = "port";
+       public static final String PROXY_PORT_ATTR = "proxyPort";
+       public static final String NAME_ATTR = "name";
+       public static final String APP_SPECIFIC_MAPPING_ATTR = 
"appSpecificMapping";
+
+       public static final String CARTRIDGES_ELEMENT = "cartridges";
+       public static final String CARTRIDGE_ELEMENT = "cartridge";
+
+       public static final String DISPLAY_NAME_ELEMENT = "displayName";
+       public static final String DESCRIPTION_ELEMENT = "description";
+       public static final String PROPERTY_ELEMENT = "property";
+       public static final String PROPERTY_NAME_ATTR = "name";
+       public static final String PROPERTY_VALUE_ATTR = "value";
+       public static final String IMAGE_ID_ELEMENT = "imageId";
+       public static final String SCALE_DOWN_ORDER_ELEMENT = "scaleDownOrder";
+       public static final String SCALE_UP_ORDER_ELEMENT = "scaleUpOrder";
+       public static final String CLASS_NAME_ELEMENT = "className";
+       public static final String MAX_INSTANCE_LIMIT_ELEMENT = 
"maxInstanceLimit";
+       public static final String PROVIDER_ELEMENT = "provider";
+       public static final String IDENTITY_ELEMENT = "identity";
+       public static final String TYPE_ELEMENT = "type";
+       public static final String SCOPE_ELEMENT = "scope";
+       public static final String ID_ELEMENT = "id";
+       public static final String CREDENTIAL_ELEMENT = "credential";
+       public static final String DEFAULT_SERVICE_ELEMENT = "default";
+       public static final String SERVICE_ELEMENT = "service";
+       public static final String SERVICES_ELEMENT = "services";
+       public static final String DIRECTORY_ELEMENT = "dir";
+       public static final String HTTP_ELEMENT = "http";
+       public static final String HTTPS_ELEMENT = "https";
+       public static final String APP_TYPE_ELEMENT = "appType";
+       public static final String SERVICE_DOMAIN_ATTR = "domain";
+       public static final String SERVICE_SUB_DOMAIN_ATTR = "subDomain";
+       public static final String SERVICE_TENANT_RANGE_ATTR = "tenantRange";
+       public static final String POLICY_NAME = "policyName";
+       public static final String PAYLOAD_ELEMENT = "payload";
+       public static final String DATA_PUBLISHER_ELEMENT = "dataPublisher";
+       public static final String TOPOLOGY_SYNC_ELEMENT = "topologySync";
+       public static final String ENABLE_ATTR = "enable";
+       public static final String BAM_SERVER_ELEMENT = "bamServer";
+       public static final String CRON_ELEMENT = "cron";
+       public static final String BAM_SERVER_ADMIN_USERNAME_ELEMENT = 
"adminUserName";
+       public static final String BAM_SERVER_ADMIN_PASSWORD_ELEMENT = 
"adminPassword";
+       public static final String CASSANDRA_INFO_ELEMENT = "cassandraInfo";
+       public static final String HOST_ELEMENT = "host";
+       public static final String CONNECTION_URL_ELEMENT = "connectionUrl";
+       public static final String HOST_PORT_ELEMENT = "port";
+       public static final String USER_NAME_ELEMENT = "userName";
+       public static final String PASSWORD_ELEMENT = "password";
+       public static final String CLOUD_CONTROLLER_EVENT_STREAM =
+                                                                  
"org.apache.stratos.cloud.controller";
+       public static final String CLOUD_CONTROLLER_COL_FAMILY =
+                                                                
CLOUD_CONTROLLER_EVENT_STREAM.replaceAll("[/.]",
+                                                                               
                          "_");
+
+       /**
+        * column names
+        */
+       public static final String PAYLOAD_PREFIX = "payload_";
+       public static final String MEMBER_ID_COL = "memberId";
+       public static final String CARTRIDGE_TYPE_COL = "cartridgeType";
+       public static final String CLUSTER_ID_COL = "clusterId";
+       public static final String PARTITION_ID_COL = "partitionId";
+       public static final String NETWORK_ID_COL = "networkId";
+       public static final String ALIAS_COL = "alias";
+       public static final String TENANT_RANGE_COL = "tenantRange";
+       public static final String IS_MULTI_TENANT_COL = "isMultiTenant";
+       public static final String IAAS_COL = "iaas";
+       public static final String STATUS_COL = "status";
+       public static final String HOST_NAME_COL = "hostName";
+       public static final String HYPERVISOR_COL = "hypervisor";
+       public static final String RAM_COL = "ram";
+       public static final String IMAGE_ID_COL = "imageId";
+       public static final String LOGIN_PORT_COL = "loginPort";
+       public static final String OS_NAME_COL = "osName";
+       public static final String OS_VERSION_COL = "osVersion";
+       public static final String OS_ARCH_COL = "osArch";
+       public static final String OS_BIT_COL = "is64bitOS";
+       public static final String PRIV_IP_COL = "privateIPAddresses";
+       public static final String PUB_IP_COL = "publicIPAddresses";
+       public static final String ALLOCATE_IP_COL = "allocateIPAddresses";
+
+       /**
+        * Properties
+        */
+       public static final String REGION_PROPERTY = "region";
+       public static final String TOPICS_PROPERTY = "topics";
+       public static final String PUBLIC_IP_PROPERTY = "public_ip";
+       public static final String TENANT_ID_PROPERTY = "tenant_id";
+       public static final String ALIAS_PROPERTY = "alias";
+       public static final String AUTO_ASSIGN_IP_PROPERTY = "autoAssignIp";
+       public static final String CRON_PROPERTY = "cron";
+       public static final String AMQP_CONNECTION_URL_PROPERTY = 
"amqpConnectionUrl";
+       public static final String AMQP_INITIAL_CONTEXT_FACTORY_PROPERTY = 
"amqpInitialContextFactory";
+       public static final String AMQP_TOPIC_CONNECTION_FACTORY_PROPERTY =
+                                                                           
"amqpTopicConnectionFactory";
+       public static final String INSTANCE_TOPIC = "instance/#";
+       // pre define a floating ip
+       public static final String FLOATING_IP_PROPERTY = "floatingIp";
+       public static final String DEFAULT_FLOATING_IP_POOL = 
"defaultFloatingIpPool";
+
+       /**
+        * XPath expressions
+        */
+       public static final String IAAS_PROVIDER_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT + "/" +
+                                                        IAAS_PROVIDERS_ELEMENT 
+ "/" +
+                                                        IAAS_PROVIDER_ELEMENT;
+       public static final String PARTITION_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT + "/" +
+                                                    PARTITIONS_ELEMENT + "/" + 
PARTITION_ELEMENT;
+       public static final String REGION_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT + "/" +
+                                                 IAAS_PROVIDERS_ELEMENT + "/" 
+ IAAS_PROVIDER_ELEMENT +
+                                                 "/" + REGION_ELEMENT;
+       public static final String ZONE_XPATH = "/" + CLOUD_CONTROLLER_ELEMENT 
+ "/" +
+                                               IAAS_PROVIDERS_ELEMENT + "/" + 
IAAS_PROVIDER_ELEMENT +
+                                               "/" + REGION_ELEMENT + "/" + 
ZONE_ELEMENT;
+       public static final String HOST_XPATH = "/" + CLOUD_CONTROLLER_ELEMENT 
+ "/" +
+                                               IAAS_PROVIDERS_ELEMENT + "/" + 
IAAS_PROVIDER_ELEMENT +
+                                               "/" + REGION_ELEMENT + "/" + 
ZONE_ELEMENT + "/" +
+                                               HOST_ELEMENT;
+       public static final String PROPERTY_ELEMENT_XPATH = "/" + 
PROPERTY_ELEMENT;
+       public static final String IMAGE_ID_ELEMENT_XPATH = "/" + 
IMAGE_ID_ELEMENT;
+       public static final String SCALE_UP_ORDER_ELEMENT_XPATH = "/" + 
SCALE_UP_ORDER_ELEMENT;
+       public static final String SCALE_DOWN_ORDER_ELEMENT_XPATH = "/" + 
SCALE_DOWN_ORDER_ELEMENT;
+       public static final String PROVIDER_ELEMENT_XPATH = "/" + 
PROPERTY_ELEMENT;
+       public static final String IDENTITY_ELEMENT_XPATH = "/" + 
IDENTITY_ELEMENT;
+       public static final String CREDENTIAL_ELEMENT_XPATH = "/" + 
CREDENTIAL_ELEMENT;
+       public static final String SERVICES_ELEMENT_XPATH = "/" + 
SERVICES_ELEMENT + "/" +
+                                                           SERVICE_ELEMENT;
+       public static final String SERVICE_ELEMENT_XPATH = "/" + 
SERVICE_ELEMENT;
+       public static final String CARTRIDGE_ELEMENT_XPATH = "/" + 
CARTRIDGE_ELEMENT;
+       public static final String PAYLOAD_ELEMENT_XPATH = "/" + 
PAYLOAD_ELEMENT;
+       public static final String HOST_ELEMENT_XPATH = "/" + HOST_ELEMENT;
+       public static final String CARTRIDGES_ELEMENT_XPATH = "/" + 
CARTRIDGES_ELEMENT + "/" +
+                                                             CARTRIDGE_ELEMENT;
+       public static final String IAAS_PROVIDER_ELEMENT_XPATH = "/" + 
IAAS_PROVIDER_ELEMENT;
+       public static final String DEPLOYMENT_ELEMENT_XPATH = "/" + 
DEPLOYMENT_ELEMENT;
+       public static final String PORT_MAPPING_ELEMENT_XPATH = "/" + 
PORT_MAPPING_ELEMENT;
+       public static final String APP_TYPES_ELEMENT_XPATH = "/" + 
APP_TYPES_ELEMENT;
+
+       public static final String DATA_PUBLISHER_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT + "/" +
+                                                         
DATA_PUBLISHER_ELEMENT;
+       public static final String TOPOLOGY_SYNC_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT + "/" +
+                                                        TOPOLOGY_SYNC_ELEMENT;
+       public static final String DATA_PUBLISHER_CRON_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT + "/" +
+                                                              CRON_ELEMENT;
+       public static final String BAM_SERVER_ADMIN_USERNAME_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT +
+                                                                    "/" +
+                                                                    
BAM_SERVER_ADMIN_USERNAME_ELEMENT;
+       public static final String BAM_SERVER_ADMIN_PASSWORD_XPATH = "/" + 
CLOUD_CONTROLLER_ELEMENT +
+                                                                    "/" +
+                                                                    
BAM_SERVER_ADMIN_PASSWORD_ELEMENT;
+       // public static final String CASSANDRA_HOST_ADDRESS_XPATH =
+       // "/"+CLOUD_CONTROLLER_ELEMENT+
+       // "/"+CASSANDRA_HOST_ADDRESS;
+       // public static final String CASSANDRA_HOST_PORT_XPATH =
+       // "/"+CLOUD_CONTROLLER_ELEMENT+
+       // "/"+CASSANDRA_HOST_PORT;
+
+       /**
+        * Secret Manager related aliases.
+        */
+       public static final String ALIAS_ATTRIBUTE = "svns:secretAlias";
+
+       /**
+        * Payload related constants
+        */
+       public static final String PAYLOAD_NAME = "payload";
+       public static final String ENTRY_SEPARATOR = ",";
+
+       /**
+        * Publisher task related constants
+        */
+       public static final String DATA_PUB_TASK_TYPE = 
"CLOUD_CONTROLLER_DATA_PUBLISHER_TASK";
+       // default is : data publisher will run in first second of every minute
+       public static final String PUB_CRON_EXPRESSION = "1 * * * * ? *";
+       public static final String DATA_PUB_TASK_NAME = 
"CartridgeInstanceDataPublisher";
+       public static final String DEFAULT_BAM_SERVER_USER_NAME = "admin";
+       public static final String DEFAULT_BAM_SERVER_PASSWORD = "admin";
+       public static final String DEFAULT_CASSANDRA_URL = "localhost:9160";
+       public static final String DEFAULT_CASSANDRA_USER = "admin";
+       public static final String DEFAULT_CASSANDRA_PASSWORD = "admin";
+       public static final String DEFAULT_CASSANDRA_CLUSTER_NAME = "Test 
Cluster";
+       public static final String DEFAULT_CASSANDRA_KEY_SPACE = "EVENT_KS";
 
-    
-    
-    /**
-     * Properties
-     */
-    public static final String REGION_PROPERTY = "region";
-    public static final String TOPICS_PROPERTY = "topics";
-    public static final String PUBLIC_IP_PROPERTY = "public_ip";
-    public static final String TENANT_ID_PROPERTY = "tenant_id";
-    public static final String ALIAS_PROPERTY = "alias";
-    public static final String AUTO_ASSIGN_IP_PROPERTY = "autoAssignIp";
-    public static final String CRON_PROPERTY = "cron";
-    public static final String AMQP_CONNECTION_URL_PROPERTY = 
"amqpConnectionUrl";
-    public static final String AMQP_INITIAL_CONTEXT_FACTORY_PROPERTY = 
"amqpInitialContextFactory";
-    public static final String AMQP_TOPIC_CONNECTION_FACTORY_PROPERTY = 
"amqpTopicConnectionFactory";
-    public static final String INSTANCE_TOPIC = "instance-status";
-    // pre define a floating ip
-    public static final String FLOATING_IP_PROPERTY = "floatingIp";
-    public static final String DEFAULT_FLOATING_IP_POOL = 
"defaultFloatingIpPool";
-    
-    
-    /**
-     * XPath expressions
-     */
-    public static final String IAAS_PROVIDER_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+"/"+
-     IAAS_PROVIDERS_ELEMENT+"/"+IAAS_PROVIDER_ELEMENT;
-     public static final String PARTITION_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+"/"+
-     PARTITIONS_ELEMENT+"/"+PARTITION_ELEMENT;
-    public static final String REGION_XPATH = "/"+CLOUD_CONTROLLER_ELEMENT+"/"+
-     IAAS_PROVIDERS_ELEMENT+"/"+IAAS_PROVIDER_ELEMENT + "/" + REGION_ELEMENT;
-    public static final String ZONE_XPATH = "/"+CLOUD_CONTROLLER_ELEMENT+"/"+
-     IAAS_PROVIDERS_ELEMENT+"/"+IAAS_PROVIDER_ELEMENT + "/" + REGION_ELEMENT + 
"/" + ZONE_ELEMENT;
-     public static final String HOST_XPATH = "/"+CLOUD_CONTROLLER_ELEMENT+"/"+
-     IAAS_PROVIDERS_ELEMENT+"/"+IAAS_PROVIDER_ELEMENT + "/" + REGION_ELEMENT + 
"/" + ZONE_ELEMENT
-             + "/" + HOST_ELEMENT ;
-    public static final String PROPERTY_ELEMENT_XPATH = "/"+PROPERTY_ELEMENT;
-    public static final String IMAGE_ID_ELEMENT_XPATH = "/"+IMAGE_ID_ELEMENT;
-    public static final String SCALE_UP_ORDER_ELEMENT_XPATH = 
"/"+SCALE_UP_ORDER_ELEMENT;
-    public static final String SCALE_DOWN_ORDER_ELEMENT_XPATH = 
"/"+SCALE_DOWN_ORDER_ELEMENT;
-    public static final String PROVIDER_ELEMENT_XPATH = "/"+PROPERTY_ELEMENT;
-    public static final String IDENTITY_ELEMENT_XPATH = "/"+IDENTITY_ELEMENT;
-    public static final String CREDENTIAL_ELEMENT_XPATH = 
"/"+CREDENTIAL_ELEMENT;
-    public static final String SERVICES_ELEMENT_XPATH = 
"/"+SERVICES_ELEMENT+"/"+SERVICE_ELEMENT;
-    public static final String SERVICE_ELEMENT_XPATH = "/"+SERVICE_ELEMENT;
-    public static final String CARTRIDGE_ELEMENT_XPATH = "/"+CARTRIDGE_ELEMENT;
-    public static final String PAYLOAD_ELEMENT_XPATH = "/"+PAYLOAD_ELEMENT;
-    public static final String HOST_ELEMENT_XPATH = "/"+HOST_ELEMENT;
-    public static final String CARTRIDGES_ELEMENT_XPATH = 
"/"+CARTRIDGES_ELEMENT+"/"+CARTRIDGE_ELEMENT;
-    public static final String IAAS_PROVIDER_ELEMENT_XPATH = 
"/"+IAAS_PROVIDER_ELEMENT;
-    public static final String DEPLOYMENT_ELEMENT_XPATH = 
"/"+DEPLOYMENT_ELEMENT;
-    public static final String PORT_MAPPING_ELEMENT_XPATH = 
"/"+PORT_MAPPING_ELEMENT;
-    public static final String APP_TYPES_ELEMENT_XPATH = "/"+APP_TYPES_ELEMENT;
-    
-    public static final String DATA_PUBLISHER_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+
-            "/"+DATA_PUBLISHER_ELEMENT;
-    public static final String TOPOLOGY_SYNC_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+
-            "/"+TOPOLOGY_SYNC_ELEMENT;
-    public static final String DATA_PUBLISHER_CRON_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+
-            "/"+CRON_ELEMENT;
-    public static final String BAM_SERVER_ADMIN_USERNAME_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+
-            "/"+BAM_SERVER_ADMIN_USERNAME_ELEMENT;
-    public static final String BAM_SERVER_ADMIN_PASSWORD_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+
-            "/"+BAM_SERVER_ADMIN_PASSWORD_ELEMENT;
-//    public static final String CASSANDRA_HOST_ADDRESS_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+
-//            "/"+CASSANDRA_HOST_ADDRESS;
-//    public static final String CASSANDRA_HOST_PORT_XPATH = 
"/"+CLOUD_CONTROLLER_ELEMENT+
-//            "/"+CASSANDRA_HOST_PORT;
-    
-    
-    /**
-     * Secret Manager related aliases.
-     */
-    public static final String ALIAS_ATTRIBUTE = "svns:secretAlias";
-    
-    /**
-     * Payload related constants
-     */
-    public static final String PAYLOAD_NAME = "payload";
-    public static final String ENTRY_SEPARATOR = ",";
-    
-    /**
-     * Publisher task related constants
-     */
-    public static final String DATA_PUB_TASK_TYPE = 
"CLOUD_CONTROLLER_DATA_PUBLISHER_TASK";
-    // default is : data publisher will run in first second of every minute
-    public static final String PUB_CRON_EXPRESSION = "1 * * * * ? *";
-    public static final String DATA_PUB_TASK_NAME = 
"CartridgeInstanceDataPublisher";
-    public static final String DEFAULT_BAM_SERVER_USER_NAME = "admin";
-    public static final String DEFAULT_BAM_SERVER_PASSWORD = "admin";
-    public static final String DEFAULT_CASSANDRA_URL = "localhost:9160";
-    public static final String DEFAULT_CASSANDRA_USER = "admin";
-    public static final String DEFAULT_CASSANDRA_PASSWORD = "admin";
-    public static final String DEFAULT_CASSANDRA_CLUSTER_NAME = "Test Cluster";
-    public static final String DEFAULT_CASSANDRA_KEY_SPACE = "EVENT_KS";
-    
        /**
         * Directories
         */
-       public static final String SERVICES_DIR = 
CarbonUtils.getCarbonRepository()
-               + File.separator + "services"+File.separator;
-    
-    /**
-     * Topology sync related constants
-     */
-    public static final String TOPOLOGY_FILE_PATH = 
CarbonUtils.getCarbonConfigDirPath()+File.separator+"service-topology.conf";
+       public static final String SERVICES_DIR = 
CarbonUtils.getCarbonRepository() + File.separator +
+                                                 "services" + File.separator;
+
+       /**
+        * Topology sync related constants
+        */
+       public static final String TOPOLOGY_FILE_PATH = 
CarbonUtils.getCarbonConfigDirPath() +
+                                                       File.separator + 
"service-topology.conf";
        public static final String TOPOLOGY_SYNC_CRON = "1 * * * * ? *";
        public static final String TOPOLOGY_SYNC_TASK_NAME = 
"TOPOLOGY_SYNC_TASK";
        public static final String TOPOLOGY_SYNC_TASK_TYPE = 
"TOPOLOGY_SYNC_TASK_TYPE";
-       public static final String AMQP_CONNECTION_URL = 
"amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
-       public static final String AMQP_INITIAL_CONTEXT_FACTORY = 
"org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
+       public static final String AMQP_CONNECTION_URL =
+                                                        
"amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
+       public static final String AMQP_INITIAL_CONTEXT_FACTORY =
+                                                                 
"org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        public static final String AMQP_TOPIC_CONNECTION_FACTORY = 
"qpidConnectionfactory";
-    
+
        /**
         * Persistence
         */
        public static final String CLOUD_CONTROLLER_RESOURCE = 
"/cloud.controller";
        public static final String DATA_RESOURCE = "/data";
-    public static final String TOPOLOGY_RESOURCE = "/topology";
-    public static final String AVAILABILITY_ZONE = "availabilityZone";
-    public static final String KEY_PAIR = "keyPair";
-    public static final String HOST = "host";
-    public static final String SECURITY_GROUP_IDS = "securityGroupIds";
-    public static final String SECURITY_GROUPS = "securityGroups";
-    public static final String SUBNET_ID = "subnetId";
-    public static final String TAGS = "tags";
-    public static final String TAGS_AS_KEY_VALUE_PAIRS_PREFIX = "tag:";
-    public static final String AUTO_ASSIGN_IP = "autoAssignIp";
-    public static final String BLOCK_UNTIL_RUNNING = "blockUntilRunning";
-    public static final String INSTANCE_TYPE = "instanceType";
-    public static final String ASSOCIATE_PUBLIC_IP_ADDRESS = 
"associatePublicIpAddress";
-    public static final String LB_CLUSTER_ID_COL = "lbclusterId";
-    public static final String NETWORK_INTERFACES = "networkInterfaces";
-    public static final String NETWORK_FIXED_IP = "fixedIp";
-    public static final String NETWORK_PORT = "portUuid";
-    public static final String NETWORK_UUID = "networkUuid";
+       public static final String TOPOLOGY_RESOURCE = "/topology";
+       public static final String AVAILABILITY_ZONE = "availabilityZone";
+       public static final String KEY_PAIR = "keyPair";
+       public static final String HOST = "host";
+       public static final String SECURITY_GROUP_IDS = "securityGroupIds";
+       public static final String SECURITY_GROUPS = "securityGroups";
+       public static final String SUBNET_ID = "subnetId";
+       public static final String TAGS = "tags";
+       public static final String TAGS_AS_KEY_VALUE_PAIRS_PREFIX = "tag:";
+       public static final String AUTO_ASSIGN_IP = "autoAssignIp";
+       public static final String BLOCK_UNTIL_RUNNING = "blockUntilRunning";
+       public static final String INSTANCE_TYPE = "instanceType";
+       public static final String ASSOCIATE_PUBLIC_IP_ADDRESS = 
"associatePublicIpAddress";
+       public static final String LB_CLUSTER_ID_COL = "lbclusterId";
+       public static final String NETWORK_INTERFACES = "networkInterfaces";
+       public static final String NETWORK_FIXED_IP = "fixedIp";
+       public static final String NETWORK_PORT = "portUuid";
+       public static final String NETWORK_UUID = "networkUuid";
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/pom.xml 
b/components/org.apache.stratos.manager/pom.xml
index 258db8d..9e1386f 100644
--- a/components/org.apache.stratos.manager/pom.xml
+++ b/components/org.apache.stratos.manager/pom.xml
@@ -120,6 +120,12 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        
+         <dependency>
+                       <groupId>org.eclipse.paho</groupId>
+                       <artifactId>mqtt-client</artifactId>
+                       <version>0.4.0</version>
+                </dependency>
     </dependencies>
 
        <build>

Reply via email to