http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java
deleted file mode 100644
index 1e71129..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.interfaces;
-
-import org.apache.stratos.cloud.controller.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.exception.*;
-import org.apache.stratos.cloud.controller.pojo.*;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-/**
- * This API provides a way to communicate with underline
- * Infrastructure which are supported by <i>jClouds</i>.
- * 
- */
-public interface CloudControllerService {
-    
-       /**
-        * Deploys a Cartridge configuration 
-        * @param cartridgeConfig cartridge configuration to be deployed
-        * @throws InvalidCartridgeDefinitionException if the cartridge 
configuration is not valid.
-        * @throws InvalidIaasProviderException if the iaas providers 
configured are not valid.
-        * @throws IllegalArgumentException  if the provided argument is not 
valid.
-        */
-    void deployCartridgeDefinition(CartridgeConfig cartridgeConfig)
-            throws InvalidCartridgeDefinitionException, 
InvalidIaasProviderException;
-    
-    /**
-     * Undeploys a Cartridge configuration which is already deployed.
-     * @param cartridgeType type of the cartridge to be undeployed.
-     * @throws InvalidCartridgeTypeException if the cartridge type specified 
is not a deployed cartridge.
-     */
-    public void undeployCartridgeDefinition(String cartridgeType) throws 
InvalidCartridgeTypeException;
-    
-    public void deployServiceGroup(ServiceGroup servicegroup) throws 
InvalidServiceGroupException;
-    
-    public void undeployServiceGroup(String name) throws 
InvalidServiceGroupException;
-    
-    public ServiceGroup getServiceGroup (String name) throws 
InvalidServiceGroupException;
-    
-    public String []getServiceGroupSubGroups (String name) throws 
InvalidServiceGroupException;
-    
-    public String [] getServiceGroupCartridges (String name) throws 
InvalidServiceGroupException;
-    
-    public Dependencies getServiceGroupDependencies (String name) throws 
InvalidServiceGroupException;
-
-    /**
-     * Validate a given {@link Partition} for basic property existence.
-     * @param partition partition to be validated.
-     * @return whether the partition is a valid one.
-     * @throws InvalidPartitionException if the partition is invalid.
-     */
-    boolean validatePartition(Partition partition) throws 
InvalidPartitionException;
-    
-    /**
-     * Validate a given deployment policy.
-     * @param cartridgeType type of the cartridge
-     * @param partitions partitions
-     * @return whether the policy is a valid one against the given Cartridge.
-     * @throws InvalidPartitionException if the policy contains at least one 
invalid partition.
-     * @throws InvalidCartridgeTypeException if the given Cartridge type is 
not a valid one.
-     */
-     boolean validateDeploymentPolicy(String cartridgeType, Partition[] 
partitions) 
-            throws InvalidPartitionException, InvalidCartridgeTypeException;
-
-    /**
-     * <p>
-     * Registers the details of a newly created service cluster. This will 
override an already
-     * present service cluster, if there is any. A service cluster is uniquely 
identified by its
-     * domain and sub domain combination.
-     * </p>
-     * @param registrant information about the new subscription.
-     * @return whether the registration is successful or not.
-     * 
-     * @throws UnregisteredCartridgeException
-     *             when the cartridge type requested by this service is
-     *             not a registered one.
-     */
-    boolean registerService(Registrant registrant) throws 
UnregisteredCartridgeException;
-
-    /**
-     * Calling this method will result in an instance startup, which is belong
-     * to the provided Cluster ID. Also note that the instance that is 
starting up
-     * belongs to the group whose name is derived from its Cluster ID, 
replacing <i>.</i>
-     * by a hyphen (<i>-</i>).
-     * @param member Context with cluster id, partition etc.
-     * @return updated {@link MemberContext}
-     * @throws UnregisteredCartridgeException if the requested Cartridge type 
is not a registered one.
-     * @throws InvalidIaasProviderException if the iaas requested is not valid.
-     */
-    MemberContext startInstance(MemberContext member) throws 
UnregisteredCartridgeException, InvalidIaasProviderException;
-    
-    /**
-     * Create a container cluster.
-     * @param {@link ContainerClusterContext} Context with cluster id, and 
host cluster details. 
-     * @return a list of {@link MemberContext}s correspond to each Pod created.
-     * @throws UnregisteredCartridgeException if the requested Cartridge type 
is not a registered one.
-     */
-    MemberContext[] startContainers(ContainerClusterContext clusterContext) 
throws UnregisteredCartridgeException;
-    
-    /**
-     * Calling this method will result in termination of the instance with 
given member id in the given Partition.
-     * 
-     * @param memberId
-     *            member ID of the instance to be terminated.
-     * @return whether an instance terminated successfully or not.
-     */
-    void terminateInstance(String memberId) throws InvalidMemberException, 
InvalidCartridgeTypeException;
-
-    /**
-     * Calling this method will result in termination of all instances belong
-     * to the provided cluster ID.
-     * 
-     * @param clusterId
-     *            cluster ID of the instance to be terminated.
-     * @return whether an instance terminated successfully or not.
-     */
-    void terminateAllInstances(String clusterId) throws 
InvalidClusterException;
-    
-    /**
-     * Terminate all containers of the given cluster.
-     * @param clusterId id of the subjected cluster.
-     * @return terminated {@link MemberContext}s
-     * @throws InvalidClusterException
-     */
-    MemberContext[] terminateAllContainers(String clusterId) throws 
InvalidClusterException;
-    
-    /**
-     * Terminate a given member/Kubernetes Pod.
-     * @param memberId member/Pod id to be terminated.
-     * @return terminated {@link MemberContext}
-     * @throws MemberTerminationFailedException
-     */
-    MemberContext terminateContainer(String memberId) throws 
MemberTerminationFailedException;
-    
-    /**
-     * Update the Kubernetes controller created for the given cluster with the 
specified number of replicas.
-     * @param clusterId id of the subjected cluster.
-     * @param replicas total number of replicas to be set to the controller.
-     * @return newly created Members if any / terminated {@link MemberContext} 
in scale down scenario.
-     * @throws InvalidClusterException
-     */
-    MemberContext[] updateContainers(String clusterId, int replicas) throws 
UnregisteredCartridgeException;
-
-    /**
-     * Update the topology with current cluster status.
-     * @param serviceName id of service which the cluster belongs to.
-     * @param clusterId id of the subjected cluster.
-     * @param instanceId id of the cluster instance.
-     * @param status total number of replicas to be set to the controller.
-     */
-    void updateClusterStatus(String serviceName, String clusterId, String 
instanceId, ClusterStatus status);
-    
-    /**
-     * Unregister a docker service identified by the given cluster id.
-     * @param clusterId service cluster id.
-     * @throws UnregisteredClusterException if the service cluster requested 
is not a registered one.
-     */
-    void unregisterDockerService(String clusterId) throws 
UnregisteredClusterException;
-
-    /**
-     * Unregister the service cluster identified by the given cluster id.
-     * @param clusterId service cluster id.
-     * @throws UnregisteredClusterException if the service cluster requested 
is not a registered one.
-     */
-    void unregisterService(String clusterId) throws 
UnregisteredClusterException;
-    
-    /**
-     * This method will return the information regarding the given cartridge, 
if present.
-     * Else this will return <code>null</code>.
-     * 
-     * @param cartridgeType
-     *            type of the cartridge.
-     * @return {@link org.apache.stratos.cloud.controller.pojo.CartridgeInfo} 
of the given cartridge type or <code>null</code>.
-     * @throws UnregisteredCartridgeException if there is no registered 
cartridge with this type.
-     */
-    CartridgeInfo getCartridgeInfo(String cartridgeType) throws 
UnregisteredCartridgeException;
-
-    /**
-     * Calling this method will result in returning the types of {@link 
org.apache.stratos.cloud.controller.pojo.Cartridge}s
-     * registered in Cloud Controller.
-     * 
-     * @return String array containing types of registered {@link 
org.apache.stratos.cloud.controller.pojo.Cartridge}s.
-     */
-    String[] getRegisteredCartridges();
-
-    /**
-     * Returns the {@link 
org.apache.stratos.cloud.controller.pojo.ClusterContext} object associated with 
the given cluster id, or null if not found
-     *
-     * @param clusterId cluster id
-     * @return {@link org.apache.stratos.cloud.controller.pojo.ClusterContext} 
object  associated with the given cluster id, or null
-     */
-    public ClusterContext getClusterContext (String clusterId);
-
-    /**
-     * Creates the clusters relevant to an application in the topology model
-     *
-     * @param appId application id
-     * @param appClustersContexts  cluster information holder object
-     * @throws ApplicationClusterRegistrationException if the cluster 
information are null/empty
-     */
-    public void createApplicationClusters(String appId, 
ApplicationClusterContextDTO[] appClustersContexts) throws
-            ApplicationClusterRegistrationException;
-
-    /**
-     * Creates a cluster instance with the given information
-     *
-     * @param serviceType serviceType
-     * @param clusterId cluster id
-     * @param alias alias provided in the subscription parameters
-     * @param instanceId instance id
-     * @throws ClusterInstanceCreationException if an y error occurs in 
cluster instance creation
-     */
-    public void createClusterInstance (String serviceType, String clusterId, 
String alias, String instanceId) throws
-            ClusterInstanceCreationException;
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java
deleted file mode 100644
index 00766de..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.interfaces;
-
-import org.jclouds.compute.ComputeService;
-import org.jclouds.compute.domain.NodeMetadata;
-import org.jclouds.compute.domain.Template;
-import org.apache.stratos.cloud.controller.exception.InvalidHostException;
-import org.apache.stratos.cloud.controller.exception.InvalidRegionException;
-import org.apache.stratos.cloud.controller.exception.InvalidZoneException;
-import org.apache.stratos.cloud.controller.pojo.IaasProvider;
-import 
org.apache.stratos.cloud.controller.validate.interfaces.PartitionValidator;
-
-/**
- * All IaaSes that are going to support by Cloud Controller, should extend 
this abstract class.
- */
-public abstract class Iaas {
-       /**
-        * Reference to the corresponding {@link IaasProvider}
-        */
-       private IaasProvider iaasProvider;
-       
-       public Iaas(IaasProvider iaasProvider) {
-               this.setIaasProvider(iaasProvider);
-       }
-       
-       public IaasProvider getIaasProvider() {
-               return iaasProvider;
-       }
-
-       public void setIaasProvider(IaasProvider iaasProvider) {
-               this.iaasProvider = iaasProvider;
-       }
-       
-    /**
-     * This should build the {@link ComputeService} object and the {@link 
Template} object,
-     * using the information from {@link IaasProvider} and should set the 
built 
-     * {@link ComputeService} object in the {@link 
IaasProvider#setComputeService(ComputeService)}
-     * and also should set the built {@link Template} object in the 
-     * {@link IaasProvider#setTemplate(Template)}.
-     */
-    public abstract void buildComputeServiceAndTemplate();
-    
-    /**
-     * This method provides a way to set payload that can be obtained from 
{@link IaasProvider#getPayload()}
-     * in the {@link Template} of this IaaS.
-     */
-    public abstract void setDynamicPayload();
-    
-    /**
-     * This will obtain an IP address from the allocated list and associate 
that IP with this node.
-     * @param node Node to be associated with an IP.
-     * @return associated public IP.
-     */
-    public abstract String associateAddress(NodeMetadata node);
-    
-    /**
-     * This will obtain a predefined IP address and associate that IP with 
this node, if ip is already in use allocate ip from pool 
-     * (through associateAddress())
-     * @param node Node to be associated with an IP.
-     * @ip preallocated floating Ip
-     * @return associated public IP.
-     */
-    public abstract String associatePredefinedAddress(NodeMetadata node, 
String ip);
-    
-    /**
-     * This will deallocate/release the given IP address back to pool.
-     * @param iaasInfo corresponding {@link IaasProvider}
-     * @param ip public IP address to be released.
-     */
-    public abstract void releaseAddress(String ip);
-    
-    /**
-     * This method should create a Key Pair corresponds to a given public key 
in the respective region having the name given.
-     * Also should override the value of the key pair in the {@link Template} 
of this IaaS.
-     * @param region region that the key pair will get created.
-     * @param keyPairName name of the key pair. NOTE: Jclouds adds a prefix : 
<code>jclouds#</code>
-     * @param publicKey public key, from which the key pair will be created.
-     * @return whether the key pair creation is successful or not.
-     */
-    public abstract boolean createKeyPairFromPublicKey(String region, String 
keyPairName, String publicKey);
-    
-    /**
-     * Validate a given region name against a particular IaaS.
-     * If a particular IaaS doesn't have a concept called region, it can 
simply throw {@link InvalidRegionException}.
-     * @param region name of the region.
-     * @return whether the region is valid.
-     * @throws InvalidRegionException if the region is invalid.
-     */
-    public abstract boolean isValidRegion(String region) throws 
InvalidRegionException;
-    
-    /**
-     * Validate a given zone name against a particular region in an IaaS.
-     * If a particular IaaS doesn't have a concept called zone, it can simply 
throw {@link InvalidZoneException}.
-     * @param region region of the IaaS that the zone belongs to.
-     * @param zone 
-     * @return whether the zone is valid in the given region or not.
-     * @throws InvalidZoneException if the zone is invalid in a given region.
-     */
-    public abstract boolean isValidZone(String region, String zone) throws 
InvalidZoneException, InvalidRegionException;
-    
-    /**
-     * Validate a given host id against a particular zone in an IaaS.
-     * If a particular IaaS doesn't have a concept called hosts, it can simply 
throw {@link InvalidHostException}.
-     * @param zone zone of the IaaS that the host belongs to.
-     * @param host
-     * @return whether the host is valid in the given zone or not.
-     * @throws InvalidHostException if the host is invalid in a given zone.
-     */
-    public abstract boolean isValidHost(String zone, String host) throws 
InvalidHostException;
-    
-    /**
-     * provides the {@link PartitionValidator} corresponds to this particular 
IaaS.
-     * @return {@link PartitionValidator}
-     */
-    public abstract PartitionValidator getPartitionValidator();
-
-    /**
-     * Builds only the jclouds {@link Template}
-     */
-    public abstract void buildTemplate();
-    
-    /**
-     * Create a new volume in the respective Iaas.
-     * @param sizeGB size of the volume in Giga Bytes.
-     * @return Id of the created volume.
-     */
-    public abstract String createVolume(int sizeGB, String snapshotId);
-   
-    /**
-     * Attach a given volume to an instance at the specified device path.
-     * @param instanceId of the instance.
-     * @param volumeId volume id of the volume to be attached.
-     * @param deviceName name of the device that the volume would bind to.
-     * @return the status of the attachment.
-     */
-    public abstract String attachVolume(String instanceId, String volumeId, 
String deviceName);
-    
-    /**
-     * Detach a given volume from the given instance.
-     * @param instanceId of the instance.
-     * @param volumeId volume id of the volume to be detached.
-     */
-    public abstract void detachVolume(String instanceId, String volumeId);
-    
-    /**
-     * Delete a given volume.
-     * @param volumeId volume id of the volume to be detached.
-     */
-    public abstract void deleteVolume(String volumeId);
-
-    /**
-     * This returns the device of the volume specified by the user. This is 
depends on IAAS. 
-     * For an instance /dev/sdf maps to /dev/xvdf in EC2.
-     */
-    public abstract String getIaasDevice(String device);
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index b07a875..56f9d26 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -23,14 +23,13 @@ package org.apache.stratos.cloud.controller.internal;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.cloud.controller.receiver.application.ApplicationTopicReceiver;
-import 
org.apache.stratos.cloud.controller.receiver.cluster.status.ClusterStatusTopicReceiver;
+import 
org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
+import 
org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
-import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
-import 
org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
-import 
org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver;
-import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
+import org.apache.stratos.cloud.controller.services.CloudControllerServiceImpl;
+import org.apache.stratos.cloud.controller.services.CloudControllerService;
+import 
org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
+import 
org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.util.Util;
 import org.osgi.framework.BundleContext;

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
new file mode 100644
index 0000000..780722d
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.internal;
+
+import org.apache.axis2.engine.AxisConfiguration;
+import org.wso2.carbon.caching.impl.DistributedMapProvider;
+import org.wso2.carbon.ntask.core.service.TaskService;
+import org.wso2.carbon.registry.core.Registry;
+import org.wso2.carbon.registry.core.session.UserRegistry;
+
+/**
+ * Singleton class to hold all the service references.
+ */
+public class ServiceReferenceHolder {
+
+    private static ServiceReferenceHolder instance;
+    private TaskService taskService;
+    private Registry registry;
+    private AxisConfiguration axisConfiguration;
+    private DistributedMapProvider distributedMapProvider;
+
+    private ServiceReferenceHolder() {
+    }
+
+    public static ServiceReferenceHolder getInstance() {
+        if (instance == null) {
+            instance = new ServiceReferenceHolder();
+        }
+        return instance;
+    }
+    
+    public void setAxisConfiguration(AxisConfiguration axisConfiguration) {
+        this.axisConfiguration = axisConfiguration;
+    }
+    
+    public AxisConfiguration getAxisConfiguration() {
+        return axisConfiguration;
+    }
+    
+    public TaskService getTaskService() {
+        return taskService;
+    }
+
+    public void setTaskService(TaskService taskService) {
+        this.taskService = taskService;
+    }
+    
+       public void setRegistry(UserRegistry governanceSystemRegistry) {
+               registry = governanceSystemRegistry;
+    }
+
+       public Registry getRegistry() {
+           return registry;
+    }
+
+    public void setDistributedMapProvider(DistributedMapProvider 
distributedMapProvider) {
+        this.distributedMapProvider = distributedMapProvider;
+    }
+
+    public DistributedMapProvider getDistributedMapProvider() {
+        return distributedMapProvider;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java
deleted file mode 100644
index d8bda9e..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.jcloud;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Module;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.pojo.IaasProvider;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.jclouds.ContextBuilder;
-import org.jclouds.compute.ComputeServiceContext;
-import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
-import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
-import org.jclouds.sshj.config.SshjSshClientModule;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * This class is responsible for creating a JClouds specific ComputeService 
object.
- */
-public class ComputeServiceBuilderUtil {
-    
-    private static final Log log = 
LogFactory.getLog(ComputeServiceBuilderUtil.class);
-    
-    public static byte[] getUserData(String payloadFileName) {
-        byte[] bytes = null;
-        try {
-            File file = new File(payloadFileName);
-            if (!file.exists()) {
-                handleException("Payload file " + payloadFileName + " does not 
exist");
-            }
-            if (!file.canRead()) {
-                handleException("Payload file " + payloadFileName + " does 
cannot be read");
-            }
-            bytes = getBytesFromFile(file);
-
-        } catch (IOException e) {
-            handleException("Cannot read data from payload file " + 
payloadFileName, e);
-        }
-        return bytes;
-    }
-
-    
-    public static void buildDefaultComputeService(IaasProvider iaas) {
-
-        Properties properties = new Properties();
-
-        // load properties
-        for (Map.Entry<String, String> entry : 
iaas.getProperties().entrySet()) {
-            properties.put(entry.getKey(), entry.getValue());
-        }
-
-        // set modules
-        Iterable<Module> modules =
-            ImmutableSet.<Module> of(new SshjSshClientModule(), new 
SLF4JLoggingModule(),
-                                     new EnterpriseConfigurationModule());
-
-        // build context
-        ContextBuilder builder =
-            ContextBuilder.newBuilder(iaas.getProvider())
-                          .credentials(iaas.getIdentity(), 
iaas.getCredential()).modules(modules)
-                          .overrides(properties);
-
-        // set the compute service object
-        
iaas.setComputeService(builder.buildView(ComputeServiceContext.class).getComputeService());
-    }
-    
-    public static String extractRegion(IaasProvider iaas) {
-        String region;
-        // try to find region
-        if ((region = 
iaas.getProperty(CloudControllerConstants.REGION_PROPERTY)) == null) {
-            // if the property, isn't specified, try to obtain from the image 
id
-            // image id can be in following format - {region}/{UUID}
-            region = iaas.getImage().contains("/") ? 
iaas.getImage().split("/")[0] : null;
-        }
-
-        return region;
-    }
-    
-       public static String extractZone(IaasProvider iaas) {
-
-               return 
iaas.getProperty(CloudControllerConstants.AVAILABILITY_ZONE);
-       }
-       
-    /** Returns the contents of the file in a byte array
-     *
-     * @param file
-     *            - Input File
-     * @return Bytes from the file
-     * @throws java.io.IOException
-     *             , if retrieving the file contents failed.
-     */
-    public static byte[] getBytesFromFile(File file) throws IOException {
-        if (!file.exists()) {
-            log.error("Payload file " + file.getAbsolutePath() + " does not 
exist");
-            return null;
-        }
-        InputStream is = new FileInputStream(file);
-        byte[] bytes;
-
-        try {
-            // Get the size of the file
-            long length = file.length();
-            
-            // You cannot create an array using a long type.
-            // It needs to be an int type.
-            // Before converting to an int type, check
-            // to ensure that file is not larger than Integer.MAX_VALUE.
-            if (length > Integer.MAX_VALUE) {
-                if (log.isDebugEnabled()) {
-                    log.debug("File is too large");
-                }
-            }
-
-            // Create the byte array to hold the data
-            bytes = new byte[(int) length];
-
-            // Read in the bytes
-            int offset = 0;
-            int numRead;
-            while (offset < bytes.length &&
-                (numRead = is.read(bytes, offset, bytes.length - offset)) >= 
0) {
-                offset += numRead;
-            }
-
-            // Ensure all the bytes have been read in
-            if (offset < bytes.length) {
-                throw new IOException("Could not completely read file " + 
file.getName());
-            }
-        } finally {
-            // Close the input stream and return bytes
-            is.close();
-         }
-        
-        return bytes;
-    }
-    
-    /**
-     * handles the exception
-     * 
-     * @param msg
-     *            exception message
-     */
-    private static void handleException(String msg) {
-        log.error(msg);
-        throw new CloudControllerException(msg);
-    }
-
-    /**
-     * handles the exception
-     * 
-     * @param msg
-     *            exception message
-     * @param e
-     *            exception
-     */
-    private static void handleException(String msg, Exception e) {
-        log.error(msg, e);
-        throw new CloudControllerException(msg, e);
-
-    }
-    
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java
new file mode 100644
index 0000000..6ad0258
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.exception.CloudControllerException;
+import org.apache.stratos.cloud.controller.domain.Cartridge;
+import org.apache.stratos.cloud.controller.domain.MemberContext;
+import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.jclouds.compute.domain.NodeMetadata;
+import org.wso2.carbon.base.ServerConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+import org.wso2.carbon.utils.CarbonUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ *  This will publish the state changes of a node in the topology to a data 
receiver
+ */
+public class CartridgeInstanceDataPublisher {
+    
+    private static final Log log = 
LogFactory.getLog(CartridgeInstanceDataPublisher.class);
+    private static AsyncDataPublisher dataPublisher;
+    private static StreamDefinition streamDefinition;
+    private static final String cloudControllerEventStreamVersion = "1.0.0";
+
+    public static void publish(String memberId,
+                               String partitionId,
+                               String networkId,
+                               String clusterId,
+                               String serviceName,
+                               String status,
+                               NodeMetadata metadata) {
+        if(!FasterLookUpDataHolder.getInstance().getEnableBAMDataPublisher()){
+            return;
+        }
+        log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME+" cycle 
started.");
+
+        if(dataPublisher==null){
+            createDataPublisher();
+
+            //If we cannot create a data publisher we should give up
+            //this means data will not be published
+            if(dataPublisher == null){
+                log.error("Data Publisher cannot be created or found.");
+                release();
+                return;
+            }
+        }
+
+
+        MemberContext memberContext = 
FasterLookUpDataHolder.getInstance().getMemberContextOfMemberId(memberId);
+        String cartridgeType = memberContext.getCartridgeType();
+        Cartridge cartridge = 
FasterLookUpDataHolder.getInstance().getCartridge(cartridgeType);
+        
+        //Construct the data to be published
+        List<Object> payload = new ArrayList<Object>();
+        // Payload values
+        payload.add(memberId);
+        payload.add(serviceName);
+        payload.add(clusterId);
+        payload.add(handleNull(memberContext.getLbClusterId()));
+        payload.add(handleNull(partitionId));
+        payload.add(handleNull(networkId));
+               if (cartridge != null) {
+                       
payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
+               } else {
+                       payload.add("");
+               }
+        payload.add(handleNull(memberContext.getPartition().getProvider()));
+        payload.add(handleNull(status));
+
+        if(metadata != null) {
+            payload.add(metadata.getHostname());
+            payload.add(metadata.getHardware().getHypervisor());
+            payload.add(String.valueOf(metadata.getHardware().getRam()));
+            payload.add(metadata.getImageId());
+            payload.add(metadata.getLoginPort());
+            payload.add(metadata.getOperatingSystem().getName());
+            payload.add(metadata.getOperatingSystem().getVersion());
+            payload.add(metadata.getOperatingSystem().getArch());
+            
payload.add(String.valueOf(metadata.getOperatingSystem().is64Bit()));
+        } else {
+            payload.add("");
+            payload.add("");
+            payload.add("");
+            payload.add("");
+            payload.add(0);
+            payload.add("");
+            payload.add("");
+            payload.add("");
+            payload.add("");
+        }
+
+        payload.add(handleNull(memberContext.getPrivateIpAddress()));
+        payload.add(handleNull(memberContext.getPublicIpAddress()));
+        payload.add(handleNull(memberContext.getAllocatedIpAddress()));
+
+        Event event = new Event();
+        event.setPayloadData(payload.toArray());
+        event.setArbitraryDataMap(new HashMap<String, String>());
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Publishing BAM event: [stream] %s 
[version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
+            }
+            dataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
+        } catch (AgentException e) {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not publish BAM event: [stream] 
%s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), 
e);
+            }
+        }
+    }
+    
+    private static void release(){
+        FasterLookUpDataHolder.getInstance().setPublisherRunning(false);
+    }
+    
+    private static StreamDefinition initializeStream() throws Exception {
+        streamDefinition = new StreamDefinition(
+                CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM,
+                cloudControllerEventStreamVersion);
+        streamDefinition.setNickName("cloud.controller");
+        streamDefinition.setDescription("Instances booted up by the Cloud 
Controller");
+        // Payload definition
+        List<Attribute> payloadData = new ArrayList<Attribute>();
+        payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, 
AttributeType.INT));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.ALLOCATE_IP_COL, AttributeType.STRING));
+        streamDefinition.setPayloadData(payloadData);
+        return streamDefinition;
+    }
+
+
+    private static void createDataPublisher(){
+        //creating the agent
+
+        ServerConfiguration serverConfig =  
CarbonUtils.getServerConfiguration();
+        String trustStorePath = 
serverConfig.getFirstProperty("Security.TrustStore.Location");
+        String trustStorePassword = 
serverConfig.getFirstProperty("Security.TrustStore.Password");
+        String bamServerUrl = serverConfig.getFirstProperty("BamServerURL");
+        String adminUsername = 
FasterLookUpDataHolder.getInstance().getDataPubConfig().getBamUsername();
+        String adminPassword = 
FasterLookUpDataHolder.getInstance().getDataPubConfig().getBamPassword();
+
+        System.setProperty("javax.net.ssl.trustStore", trustStorePath);
+        System.setProperty("javax.net.ssl.trustStorePassword", 
trustStorePassword);
+
+
+        try {
+            dataPublisher = new AsyncDataPublisher("tcp://" +  bamServerUrl + 
"", adminUsername, adminPassword);
+            
FasterLookUpDataHolder.getInstance().setDataPublisher(dataPublisher);
+            initializeStream();
+            dataPublisher.addStreamDefinition(streamDefinition);
+        } catch (Exception e) {
+            String msg = "Unable to create a data publisher to " + 
bamServerUrl +
+                    ". Usage Agent will not function properly. ";
+            log.error(msg, e);
+            throw new CloudControllerException(msg, e);
+        }
+    }
+    
+    private static String handleNull(String val) {
+        if (val == null) {
+            return "";
+        }
+        return val;
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
new file mode 100644
index 0000000..c3f6da8
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.messaging.publisher;
+
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder;
+import 
org.apache.stratos.cloud.controller.messaging.topology.TopologySynchronizerTask;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.wso2.carbon.ntask.common.TaskException;
+import org.wso2.carbon.ntask.core.TaskInfo;
+import org.wso2.carbon.ntask.core.TaskManager;
+import org.wso2.carbon.ntask.core.service.TaskService;
+
+/**
+ * Topology synchronizer task scheduler for scheduling the topology 
synchronizer task
+ * using carbon task service.
+ */
+public class TopologySynchronizerTaskScheduler {
+
+    private static final Log log = 
LogFactory.getLog(TopologySynchronizerTaskScheduler.class);
+
+    private static final FasterLookUpDataHolder dataHolder = 
FasterLookUpDataHolder.getInstance();
+
+    public static void schedule(TaskService taskService) {
+        TaskManager taskManager = null;
+        try {
+
+            if 
(!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE))
 {
+                // Register task type
+                
taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
+
+                // Register task
+                taskManager = 
taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
+                String cronProp = 
dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY);
+                               String cron = cronProp != null ?  cronProp 
:CloudControllerConstants.TOPOLOGY_SYNC_CRON ;
+                TaskInfo.TriggerInfo triggerInfo = new 
TaskInfo.TriggerInfo(cron);
+                TaskInfo taskInfo = new 
TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
+                        TopologySynchronizerTask.class.getName(),
+                        new HashMap<String, String>(), triggerInfo);
+                taskManager.registerTask(taskInfo);
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Topology synchronization task 
scheduled: %s", CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME));
+                }
+            }
+
+        } catch (Exception e) {
+            if (taskManager != null) {
+                try {
+                    
taskManager.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
+                } catch (TaskException te) {
+                    if (log.isErrorEnabled()) {
+                        log.error(te);
+                    }
+                }
+            }
+            
+            String msg = String.format("Could not schedule topology 
synchronization task: %s", CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
+            log.error(msg, e);
+                       throw new RuntimeException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
new file mode 100644
index 0000000..3e045a9
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.receiver.application;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
+import org.apache.stratos.messaging.event.Event;
+import 
org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
+import 
org.apache.stratos.messaging.listener.applications.ApplicationTerminatedEventListener;
+import 
org.apache.stratos.messaging.message.receiver.applications.ApplicationsEventReceiver;
+
+/**
+ * This is to receive the application topic messages.
+ */
+public class ApplicationTopicReceiver implements Runnable{
+    private static final Log log = 
LogFactory.getLog(ApplicationTopicReceiver.class);
+    private ApplicationsEventReceiver applicationsEventReceiver;
+    private boolean terminated;
+
+    public ApplicationTopicReceiver() {
+        this.applicationsEventReceiver = new ApplicationsEventReceiver();
+        addEventListeners();
+
+    }
+
+    
+    @Override
+    public void run() {
+
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller application status thread started");
+        }
+        Thread thread = new Thread(applicationsEventReceiver);
+        thread.start();
+
+        // Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller application status thread terminated");
+        }
+
+    }
+    private void addEventListeners() {
+        applicationsEventReceiver.addEventListener(new 
ApplicationTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                //Remove the application related data
+                ApplicationTerminatedEvent terminatedEvent = 
(ApplicationTerminatedEvent)event;
+                log.info("ApplicationTerminatedEvent received for 
[application] " + terminatedEvent.getAppId());
+                String appId = terminatedEvent.getAppId();
+                TopologyBuilder.handleApplicationClustersRemoved(appId, 
terminatedEvent.getClusterData());
+            }
+        });
+    }
+
+    public void setTerminated(boolean terminated) {
+        this.terminated = terminated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
new file mode 100644
index 0000000..b4ff9f2
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.cluster.status.*;
+import org.apache.stratos.messaging.listener.cluster.status.*;
+import 
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
+
+public class ClusterStatusTopicReceiver implements Runnable{
+    private static final Log log = 
LogFactory.getLog(ClusterStatusTopicReceiver.class);
+
+    private ClusterStatusEventReceiver statusEventReceiver;
+    private boolean terminated;
+
+    public ClusterStatusTopicReceiver() {
+        this.statusEventReceiver = new ClusterStatusEventReceiver();
+        addEventListeners();
+    }
+
+    public void run() {
+        Thread thread = new Thread(statusEventReceiver);
+        thread.start();
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller Cluster status thread started");
+        }
+
+        // Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller application status thread terminated");
+        }
+
+    }
+    private void addEventListeners() {
+        // Listen to topology events that affect clusters
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterResetEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterInstanceCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent)
 event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent)
 event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent)
 event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatingEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent)
 event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterInactivateEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent)
 event);
+            }
+        });
+    }
+
+    public void setTerminated(boolean terminated) {
+        this.terminated = terminated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
new file mode 100644
index 0000000..fd24c60
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.receiver.instance.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
+import org.apache.stratos.messaging.event.Event;
+import 
org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
+import 
org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
+import 
org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
+import 
org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
+import 
org.apache.stratos.messaging.listener.instance.status.InstanceMaintenanceListener;
+import 
org.apache.stratos.messaging.listener.instance.status.InstanceReadyToShutdownEventListener;
+import 
org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
+import 
org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver;
+
+/**
+ * This will handle the instance status events
+ */
+public class InstanceStatusTopicReceiver implements Runnable {
+    private static final Log log = 
LogFactory.getLog(InstanceStatusTopicReceiver.class);
+
+    private InstanceStatusEventReceiver statusEventReceiver;
+    private boolean terminated;
+
+    public InstanceStatusTopicReceiver() {
+        this.statusEventReceiver = new InstanceStatusEventReceiver();
+        addEventListeners();
+    }
+
+
+    @Override
+    public void run() {
+        Thread thread = new Thread(statusEventReceiver);
+        thread.start();
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller application status thread started");
+        }
+        ///* Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller application status thread terminated");
+        }
+    }
+
+    private void addEventListeners() {
+        statusEventReceiver.addEventListener(new 
InstanceActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) 
event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
InstanceStartedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                TopologyBuilder.handleMemberStarted((InstanceStartedEvent) 
event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
InstanceReadyToShutdownEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    
TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) 
event);
+                } catch (Exception e) {
+                    String error = "Failed to retrieve the instance status 
event message";
+                    log.error(error, e);
+                }
+            }
+        });
+
+        statusEventReceiver.addEventListener(new InstanceMaintenanceListener() 
{
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    
TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
+                } catch (Exception e) {
+                String error = "Failed to retrieve the instance status event 
message";
+                log.error(error, e);
+                }
+            }
+        });
+
+
+    }
+
+}

Reply via email to