http://git-wip-us.apache.org/repos/asf/stratos/blob/218b5fdc/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 802dfc8..618f2f9 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -52,997 +52,997 @@ import java.util.*;
  * and build the complete topology with the events received
  */
 public class TopologyBuilder {
-       private static final Log log = LogFactory.getLog(TopologyBuilder.class);
-
-       public static void handleServiceCreated(List<Cartridge> cartridgeList) {
-               Service service;
-               Topology topology = TopologyManager.getTopology();
-               if (cartridgeList == null) {
-                       log.warn(String.format("Cartridge list is empty"));
-                       return;
-               }
-
-               try {
-
-                       TopologyManager.acquireWriteLock();
-                       for (Cartridge cartridge : cartridgeList) {
-                               if 
(!topology.serviceExists(cartridge.getType())) {
-                                       ServiceType serviceType =
-                                                       
cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant;
-                                       service = new 
Service(cartridge.getType(), serviceType);
-                                       Properties properties = new 
Properties();
-
-                                       try {
-                                               Property[] propertyArray = null;
-
-                                               if (cartridge.getProperties() 
!= null) {
-                                                       if 
(cartridge.getProperties().getProperties() != null) {
-                                                               propertyArray = 
cartridge.getProperties().getProperties();
-                                                       }
-                                               }
-
-                                               List<Property> propertyList = 
new ArrayList<Property>();
-                                               if (propertyArray != null) {
-                                                       propertyList = 
Arrays.asList(propertyArray);
-                                                       if (propertyList != 
null) {
-                                                               for (Property 
property : propertyList) {
-                                                                       
properties.setProperty(property.getName(), property.getValue());
-                                                               }
-                                                       }
-                                               }
-                                       } catch (Exception e) {
-                                               log.error(e);
-                                       }
-
-                                       service.setProperties(properties);
-                                       if (cartridge.getPortMappings() != 
null) {
-                                               List<PortMapping> portMappings 
= Arrays.asList(cartridge.getPortMappings());
-                                               Port port;
-                                               //adding ports to the event
-                                               for (PortMapping portMapping : 
portMappings) {
-                                                       port = new 
Port(portMapping.getProtocol(), portMapping.getPort(),
-                                                                       
portMapping.getProxyPort());
-                                                       service.addPort(port);
-                                               }
-                                       }
-
-                                       topology.addService(service);
-                                       
TopologyManager.updateTopology(topology);
-                               }
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-               TopologyEventPublisher.sendServiceCreateEvent(cartridgeList);
-       }
-
-       public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
-               Topology topology = TopologyManager.getTopology();
-
-               for (Cartridge cartridge : cartridgeList) {
-                       Service service = 
topology.getService(cartridge.getType());
-                       if (service == null) {
-                               log.warn("Cartridge does not exist [cartidge] " 
+ cartridge);
-                               return;
-                       }
-                       if (service.getClusters().size() == 0) {
-                               if 
(topology.serviceExists(cartridge.getType())) {
-                                       try {
-                                               
TopologyManager.acquireWriteLock();
-                                               
topology.removeService(cartridge.getType());
-                                               
TopologyManager.updateTopology(topology);
-                                       } finally {
-                                               
TopologyManager.releaseWriteLock();
-                                       }
-                                       
TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
-                               } else {
-                                       log.warn(String.format("Service %s does 
not exist..", cartridge.getType()));
-                               }
-                       } else {
-                               log.warn("Subscription already exists. Hence 
not removing the service:" + cartridge.getType() +
-                                        " from the topology");
-                       }
-               }
-       }
-
-       public static void 
handleClusterCreated(ClusterStatusClusterCreatedEvent event) {
-               TopologyManager.acquireWriteLock();
-               Cluster cluster;
-
-               try {
-                       Topology topology = TopologyManager.getTopology();
-                       Service service = 
topology.getService(event.getServiceName());
-                       if (service == null) {
-                               log.error("Service " + event.getServiceName() +
-                                         " not found in Topology, unable to 
update the cluster status to Created");
-                               return;
-                       }
-
-                       if (service.clusterExists(event.getClusterId())) {
-                               log.warn("Cluster " + event.getClusterId() + " 
is already in the Topology ");
-                               return;
-                       } else {
-                               cluster = new Cluster(event.getServiceName(), 
event.getClusterId(), event.getDeploymentPolicyName(),
-                                                     
event.getAutosScalePolicyName(), event.getAppId());
-                               //cluster.setStatus(Status.Created);
-                               cluster.setHostNames(event.getHostNames());
-                               cluster.setTenantRange(event.getTenantRange());
-                               service.addCluster(cluster);
-                               TopologyManager.updateTopology(topology);
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-
-               TopologyEventPublisher.sendClusterCreatedEvent(cluster);
-       }
-
-       public static void handleApplicationClustersCreated(String appId, 
List<Cluster> appClusters) {
-
-               TopologyManager.acquireWriteLock();
-
-               try {
-                       Topology topology = TopologyManager.getTopology();
-                       for (Cluster cluster : appClusters) {
-                               Service service = 
topology.getService(cluster.getServiceName());
-                               if (service == null) {
-                                       log.error("Service " + 
cluster.getServiceName() +
-                                                 " not found in Topology, 
unable to create Application cluster");
-                               } else {
-                                       service.addCluster(cluster);
-                                       log.info("Application Cluster " + 
cluster.getClusterId() + " created in CC topology");
-                               }
-                       }
-                       TopologyManager.updateTopology(topology);
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-
-               log.debug("Creating cluster port mappings: [appication-id] " + 
appId);
-               for (Cluster cluster : appClusters) {
-                       String cartridgeType = cluster.getServiceName();
-                       Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeType);
-                       if (cartridge == null) {
-                               throw new CloudControllerException("Cartridge 
not found: [cartridge-type] " + cartridgeType);
-                       }
-
-                       for (PortMapping portMapping : 
cartridge.getPortMappings()) {
-                               ClusterPortMapping clusterPortMapping =
-                                               new ClusterPortMapping(appId, 
cluster.getClusterId(), portMapping.getName(),
-                                                                      
portMapping.getProtocol(), portMapping.getPort(),
-                                                                      
portMapping.getProxyPort());
-                               
CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
-                               log.debug("Cluster port mapping created: " + 
clusterPortMapping.toString());
-                       }
-               }
-
-               // Persist cluster port mappings
-               CloudControllerContext.getInstance().persist();
-
-               // Send application clusters created event
-               TopologyEventPublisher.sendApplicationClustersCreated(appId, 
appClusters);
-       }
-
-       public static void handleApplicationClustersRemoved(String appId, 
Set<ClusterDataHolder> clusterData) {
-               TopologyManager.acquireWriteLock();
-
-               List<Cluster> removedClusters = new ArrayList<Cluster>();
-               CloudControllerContext context = 
CloudControllerContext.getInstance();
-               try {
-                       Topology topology = TopologyManager.getTopology();
-
-                       if (clusterData != null) {
-                               // remove clusters from CC topology model and 
remove runtime information
-                               for (ClusterDataHolder aClusterData : 
clusterData) {
-                                       Service aService = 
topology.getService(aClusterData.getServiceType());
-                                       if (aService != null) {
-                                               
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
-                                       } else {
-                                               log.warn("Service " + 
aClusterData.getServiceType() + " not found, " +
-                                                        "unable to remove 
Cluster " + aClusterData.getClusterId());
-                                       }
-                                       // remove runtime data
-                                       
context.removeClusterContext(aClusterData.getClusterId());
-
-                                       log.info("Removed application [ " + 
appId + " ]'s Cluster " +
-                                                "[ " + 
aClusterData.getClusterId() + " ] from the topology");
-                               }
-                               // persist runtime data changes
-                               CloudControllerContext.getInstance().persist();
-                       } else {
-                               log.info("No cluster data found for application 
" + appId + " to remove");
-                       }
-
-                       TopologyManager.updateTopology(topology);
-
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-
-               // Remove cluster port mappings of application
-               
CloudControllerContext.getInstance().removeClusterPortMappings(appId);
-               CloudControllerContext.getInstance().persist();
-
-               TopologyEventPublisher.sendApplicationClustersRemoved(appId, 
clusterData);
-
-       }
-
-       public static void handleClusterReset(ClusterStatusClusterResetEvent 
event) {
-               TopologyManager.acquireWriteLock();
-
-               try {
-                       Topology topology = TopologyManager.getTopology();
-                       Service service = 
topology.getService(event.getServiceName());
-                       if (service == null) {
-                               log.error("Service " + event.getServiceName() +
-                                         " not found in Topology, unable to 
update the cluster status to Created");
-                               return;
-                       }
-
-                       Cluster cluster = 
service.getCluster(event.getClusterId());
-                       if (cluster == null) {
-                               log.error("Cluster " + event.getClusterId() + " 
not found in Topology, unable to update " +
-                                         "status to Created");
-                               return;
-                       }
-
-                       ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
-                       if (context == null) {
-                               log.warn("Cluster Instance Context is not found 
for [cluster] " +
-                                        event.getClusterId() + " [instance-id] 
" +
-                                        event.getInstanceId());
-                               return;
-                       }
-                       ClusterStatus status = ClusterStatus.Created;
-                       if (context.isStateTransitionValid(status)) {
-                               context.setStatus(status);
-                               log.info("Cluster Created adding status started 
for" + cluster.getClusterId());
-                               TopologyManager.updateTopology(topology);
-                               //publishing data
-                               TopologyEventPublisher
-                                               
.sendClusterResetEvent(event.getAppId(), event.getServiceName(), 
event.getClusterId(),
-                                                                      
event.getInstanceId());
-                       } else {
-                               log.warn(String.format("Cluster state 
transition is not valid: [cluster-id] %s " +
-                                                      " [instance-id] %s 
[current-status] %s [status-requested] %s",
-                                                      event.getClusterId(), 
event.getInstanceId(), context.getStatus(), status));
-                       }
-
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-
-       }
-
-       public static void handleClusterInstanceCreated(String serviceType, 
String clusterId, String alias,
-                                                       String instanceId, 
String partitionId, String networkPartitionId) {
-
-               TopologyManager.acquireWriteLock();
-
-               try {
-                       Topology topology = TopologyManager.getTopology();
-                       Service service = topology.getService(serviceType);
-                       if (service == null) {
-                               log.error("Service " + serviceType +
-                                         " not found in Topology, unable to 
update the cluster status to Created");
-                               return;
-                       }
-
-                       Cluster cluster = service.getCluster(clusterId);
-                       if (cluster == null) {
-                               log.error("Cluster " + clusterId + " not found 
in Topology, unable to update " +
-                                         "status to Created");
-                               return;
-                       }
-
-                       if (cluster.getInstanceContexts(instanceId) != null) {
-                               log.warn("The Instance context for the cluster 
already exists for [cluster] " +
-                                        clusterId + " [instance-id] " + 
instanceId);
-                               return;
-                       }
-
-                       ClusterInstance clusterInstance = new 
ClusterInstance(alias, clusterId, instanceId);
-                       
clusterInstance.setNetworkPartitionId(networkPartitionId);
-                       clusterInstance.setPartitionId(partitionId);
-                       cluster.addInstanceContext(instanceId, clusterInstance);
-                       TopologyManager.updateTopology(topology);
-
-                       ClusterInstanceCreatedEvent clusterInstanceCreatedEvent 
=
-                                       new 
ClusterInstanceCreatedEvent(serviceType, clusterId, clusterInstance);
-                       clusterInstanceCreatedEvent.setPartitionId(partitionId);
-                       
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
-
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-       }
-
-       public static void handleClusterRemoved(ClusterContext ctxt) {
-               Topology topology = TopologyManager.getTopology();
-               Service service = topology.getService(ctxt.getCartridgeType());
-               String deploymentPolicy;
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
ctxt.getCartridgeType()));
-                       return;
-               }
-
-               if (!service.clusterExists(ctxt.getClusterId())) {
-                       log.warn(String.format("Cluster %s does not exist for 
service %s", ctxt.getClusterId(),
-                                              ctxt.getCartridgeType()));
-                       return;
-               }
-
-               try {
-                       TopologyManager.acquireWriteLock();
-                       Cluster cluster = 
service.removeCluster(ctxt.getClusterId());
-                       deploymentPolicy = cluster.getDeploymentPolicyName();
-                       TopologyManager.updateTopology(topology);
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-               TopologyEventPublisher.sendClusterRemovedEvent(ctxt, 
deploymentPolicy);
-       }
-
-       /**
-        * Add member object to the topology and publish member created event
-        *
-        * @param memberContext
-        */
-       public static void handleMemberCreatedEvent(MemberContext 
memberContext) {
-               Topology topology = TopologyManager.getTopology();
-
-               Service service = 
topology.getService(memberContext.getCartridgeType());
-               String clusterId = memberContext.getClusterId();
-               Cluster cluster = service.getCluster(clusterId);
-               String memberId = memberContext.getMemberId();
-               String clusterInstanceId = memberContext.getClusterInstanceId();
-               String networkPartitionId = 
memberContext.getNetworkPartitionId();
-               String partitionId = memberContext.getPartition().getId();
-               String lbClusterId = memberContext.getLbClusterId();
-               long initTime = memberContext.getInitTime();
-               String autoscalingReason =
-                               
memberContext.getProperties().getProperty(StratosConstants.SCALING_REASON).getValue();
-               long scalingTime =
-                               
Long.parseLong(memberContext.getProperties().getProperty(StratosConstants.SCALING_TIME).getValue());
-
-               if (cluster.memberExists(memberId)) {
-                       log.warn(String.format("Member %s already exists", 
memberId));
-                       return;
-               }
-
-               try {
-                       TopologyManager.acquireWriteLock();
-                       Member member =
-                                       new Member(service.getServiceName(), 
clusterId, memberId, clusterInstanceId, networkPartitionId,
-                                                  partitionId, 
memberContext.getLoadBalancingIPType(), initTime);
-                       member.setStatus(MemberStatus.Created);
-                       member.setLbClusterId(lbClusterId);
-                       
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
-                       cluster.addMember(member);
-                       TopologyManager.updateTopology(topology);
-                       //Member created time
-                       Long timeStamp = System.currentTimeMillis();
-                       //publishing to BAM
-                       
BAMUsageDataPublisher.publish(memberContext.getMemberId(), 
memberContext.getPartition().getId(),
-                                                     
memberContext.getNetworkPartitionId(), memberContext.getClusterId(),
-                                                     
memberContext.getClusterInstanceId(), memberContext.getCartridgeType(),
-                                                     
MemberStatus.Created.toString(), timeStamp, autoscalingReason, scalingTime,
-                                                     null);
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-
-               TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
-       }
-
-       /**
-        * Update member status to initialized and publish member initialized 
event
-        *
-        * @param memberContext
-        */
-       public static void handleMemberInitializedEvent(MemberContext 
memberContext) {
-               Topology topology = TopologyManager.getTopology();
-               Service service = 
topology.getService(memberContext.getCartridgeType());
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
memberContext.getCartridgeType()));
-                       return;
-               }
-               if (!service.clusterExists(memberContext.getClusterId())) {
-                       log.warn(String.format("Cluster %s does not exist in 
service %s", memberContext.getClusterId(),
-                                              
memberContext.getCartridgeType()));
-                       return;
-               }
-
-               Member member = 
service.getCluster(memberContext.getClusterId()).
-                               getMember(memberContext.getMemberId());
-               if (member == null) {
-                       log.warn(String.format("Member %s does not exist", 
memberContext.getMemberId()));
-                       return;
-               }
-
-               try {
-                       TopologyManager.acquireWriteLock();
-
-                       // Set ip addresses
-                       
member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
-                       if (memberContext.getPrivateIPs() != null) {
-                               
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
-                       }
-                       
member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
-                       if (memberContext.getPublicIPs() != null) {
-                               
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
-                       }
-
-                       // try update lifecycle state
-                       if 
(!member.isStateTransitionValid(MemberStatus.Initialized)) {
-                               log.error("Invalid state transition from " + 
member.getStatus() + " to " +
-                                         MemberStatus.Initialized);
-                               return;
-                       } else {
-                               member.setStatus(MemberStatus.Initialized);
-                               log.info("Member status updated to 
initialized");
-
-                               TopologyManager.updateTopology(topology);
-                               //member initialized time
-                               Long timeStamp = System.currentTimeMillis();
-
-                               
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
-                               //publishing data
-                               
BAMUsageDataPublisher.publish(memberContext.getMemberId(), 
memberContext.getPartition().getId(),
-                                                             
memberContext.getNetworkPartitionId(), memberContext.getClusterId(),
-                                                             
memberContext.getClusterInstanceId(), memberContext.getCartridgeType(),
-                                                             
MemberStatus.Initialized.toString(), timeStamp, null, null, null);
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-       }
-
-       private static int findKubernetesServicePort(String clusterId, 
List<KubernetesService> kubernetesServices,
-                                                    PortMapping portMapping) {
-               for (KubernetesService kubernetesService : kubernetesServices) {
-                       if 
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
-                               return kubernetesService.getPort();
-                       }
-               }
-               throw new RuntimeException(
-                               "Kubernetes service port not found: 
[cluster-id] " + clusterId + " [port] " + portMapping.getPort());
-       }
-
-       public static void handleMemberStarted(InstanceStartedEvent 
instanceStartedEvent) {
-               try {
-                       Topology topology = TopologyManager.getTopology();
-                       Service service = 
topology.getService(instanceStartedEvent.getServiceName());
-                       if (service == null) {
-                               log.warn(String.format("Service %s does not 
exist", instanceStartedEvent.getServiceName()));
-                               return;
-                       }
-                       if 
(!service.clusterExists(instanceStartedEvent.getClusterId())) {
-                               log.warn(String.format("Cluster %s does not 
exist in service %s", instanceStartedEvent.getClusterId(),
-                                                      
instanceStartedEvent.getServiceName()));
-                               return;
-                       }
-
-                       Cluster cluster = 
service.getCluster(instanceStartedEvent.getClusterId());
-                       Member member = 
cluster.getMember(instanceStartedEvent.getMemberId());
-                       if (member == null) {
-                               log.warn(String.format("Member %s does not 
exist", instanceStartedEvent.getMemberId()));
-                               return;
-                       }
-
-                       try {
-                               TopologyManager.acquireWriteLock();
-                               // try update lifecycle state
-                               if 
(!member.isStateTransitionValid(MemberStatus.Starting)) {
-                                       log.error("Invalid State Transition 
from " + member.getStatus() + " to " +
-                                                 MemberStatus.Starting);
-                                       return;
-                               } else {
-                                       member.setStatus(MemberStatus.Starting);
-                                       log.info("member started event adding 
status started");
-
-                                       
TopologyManager.updateTopology(topology);
-                                       //member starting time
-                                       Long timeStamp = 
System.currentTimeMillis();
-                                       //memberStartedEvent.
-                                       
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
-                                       //publishing data
-                                       BAMUsageDataPublisher
-                                                       
.publish(instanceStartedEvent.getMemberId(), 
instanceStartedEvent.getPartitionId(),
-                                                                
instanceStartedEvent.getNetworkPartitionId(), 
instanceStartedEvent.getClusterId(),
-                                                                
instanceStartedEvent.getClusterInstanceId(), 
instanceStartedEvent.getServiceName(),
-                                                                
MemberStatus.Starting.toString(), timeStamp, null, null, null);
-                               }
-                       } finally {
-                               TopologyManager.releaseWriteLock();
-                       }
-               } catch (Exception e) {
-                       String message = String.format(
-                                       "Could not handle member started event: 
[application-id] %s " + "[service-name] %s [member-id] %s",
-                                       
instanceStartedEvent.getApplicationId(), instanceStartedEvent.getServiceName(),
-                                       instanceStartedEvent.getMemberId());
-                       log.warn(message, e);
-               }
-       }
-
-       public static void handleMemberActivated(InstanceActivatedEvent 
instanceActivatedEvent) {
-               Topology topology = TopologyManager.getTopology();
-               Service service = 
topology.getService(instanceActivatedEvent.getServiceName());
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
instanceActivatedEvent.getServiceName()));
-                       return;
-               }
-
-               Cluster cluster = 
service.getCluster(instanceActivatedEvent.getClusterId());
-               if (cluster == null) {
-                       log.warn(String.format("Cluster %s does not exist", 
instanceActivatedEvent.getClusterId()));
-                       return;
-               }
-
-               Member member = 
cluster.getMember(instanceActivatedEvent.getMemberId());
-               if (member == null) {
-                       log.warn(String.format("Member %s does not exist", 
instanceActivatedEvent.getMemberId()));
-                       return;
-               }
-
-               MemberActivatedEvent memberActivatedEvent =
-                               new 
MemberActivatedEvent(instanceActivatedEvent.getServiceName(), 
instanceActivatedEvent.getClusterId(),
-                                                        
instanceActivatedEvent.getClusterInstanceId(),
-                                                        
instanceActivatedEvent.getMemberId(),
-                                                        
instanceActivatedEvent.getNetworkPartitionId(),
-                                                        
instanceActivatedEvent.getPartitionId());
-
-               // grouping - set grouid
-               //TODO
-               memberActivatedEvent.setApplicationId(null);
-               try {
-                       TopologyManager.acquireWriteLock();
-                       // try update lifecycle state
-                       if 
(!member.isStateTransitionValid(MemberStatus.Active)) {
-                               log.error(
-                                               "Invalid state transition from 
[" + member.getStatus() + "] to [" + MemberStatus.Active + "]");
-                               return;
-                       } else {
-                               member.setStatus(MemberStatus.Active);
-
-                               // Set member ports
-                               try {
-                                       Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(service.getServiceName());
-                                       if (cartridge == null) {
-                                               throw new RuntimeException(
-                                                               
String.format("Cartridge not found: [cartridge-type] %s", 
service.getServiceName()));
-                                       }
-
-                                       Port port;
-                                       int portValue;
-                                       List<PortMapping> portMappings = 
Arrays.asList(cartridge.getPortMappings());
-                                       String clusterId = 
cluster.getClusterId();
-                                       ClusterContext clusterContext = 
CloudControllerContext.getInstance().getClusterContext(clusterId);
-                                       List<KubernetesService> 
kubernetesServices = clusterContext.getKubernetesServices();
-
-                                       for (PortMapping portMapping : 
portMappings) {
-                                               if (kubernetesServices != null) 
{
-                                                       portValue = 
findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
-                                               } else {
-                                                       portValue = 
portMapping.getPort();
-                                               }
-                                               port = new 
Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
-                                               member.addPort(port);
-                                               
memberActivatedEvent.addPort(port);
-                                       }
-                               } catch (Exception e) {
-                                       String message = String.format("Could 
not add member ports: [service-name] %s [member-id] %s",
-                                                                      
memberActivatedEvent.getServiceName(),
-                                                                      
memberActivatedEvent.getMemberId());
-                                       log.error(message, e);
-                               }
-
-                               // Set member ip addresses
-                               
memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
-                               
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
-                               
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
-                               
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
-                               TopologyManager.updateTopology(topology);
-
-                               //member activated time
-                               Long timeStamp = System.currentTimeMillis();
-
-                               // Publish member activated event
-                               
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
-
-                               // Publish statistics data
-                               
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), 
memberActivatedEvent.getPartitionId(),
-                                                             
memberActivatedEvent.getNetworkPartitionId(),
-                                                             
memberActivatedEvent.getClusterId(),
-                                                             
memberActivatedEvent.getClusterInstanceId(),
-                                                             
memberActivatedEvent.getServiceName(), MemberStatus.Active.toString(),
-                                                             timeStamp, null, 
null, null);
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-       }
-
-       public static void 
handleMemberReadyToShutdown(InstanceReadyToShutdownEvent 
instanceReadyToShutdownEvent)
-                       throws InvalidMemberException, 
InvalidCartridgeTypeException {
-               Topology topology = TopologyManager.getTopology();
-               Service service = 
topology.getService(instanceReadyToShutdownEvent.getServiceName());
-               //update the status of the member
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
instanceReadyToShutdownEvent.getServiceName()));
-                       return;
-               }
-
-               Cluster cluster = 
service.getCluster(instanceReadyToShutdownEvent.getClusterId());
-               if (cluster == null) {
-                       log.warn(String.format("Cluster %s does not exist", 
instanceReadyToShutdownEvent.getClusterId()));
-                       return;
-               }
-
-               Member member = 
cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
-               if (member == null) {
-                       log.warn(String.format("Member %s does not exist", 
instanceReadyToShutdownEvent.getMemberId()));
-                       return;
-               }
-               MemberReadyToShutdownEvent memberReadyToShutdownEvent =
-                               new 
MemberReadyToShutdownEvent(instanceReadyToShutdownEvent.getServiceName(),
-                                                              
instanceReadyToShutdownEvent.getClusterId(),
-                                                              
instanceReadyToShutdownEvent.getClusterInstanceId(),
-                                                              
instanceReadyToShutdownEvent.getMemberId(),
-                                                              
instanceReadyToShutdownEvent.getNetworkPartitionId(),
-                                                              
instanceReadyToShutdownEvent.getPartitionId());
-               //get the time of ReadyToShutdown state change
-               Long timeStamp = null;
-               try {
-                       TopologyManager.acquireWriteLock();
-
-                       if 
(!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
-                               log.error("Invalid State Transition from " + 
member.getStatus() + " to " +
-                                         MemberStatus.ReadyToShutDown);
-                               return;
-                       }
-                       member.setStatus(MemberStatus.ReadyToShutDown);
-                       log.info("Member Ready to shut down event adding status 
started");
-
-                       TopologyManager.updateTopology(topology);
-
-                       timeStamp = System.currentTimeMillis();
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-               
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-               //publishing data
-               BAMUsageDataPublisher
-                               
.publish(instanceReadyToShutdownEvent.getMemberId(), 
instanceReadyToShutdownEvent.getPartitionId(),
-                                        
instanceReadyToShutdownEvent.getNetworkPartitionId(),
-                                        
instanceReadyToShutdownEvent.getClusterId(),
-                                        
instanceReadyToShutdownEvent.getClusterInstanceId(),
-                                        
instanceReadyToShutdownEvent.getServiceName(), 
MemberStatus.ReadyToShutDown.toString(),
-                                        timeStamp, null, null, null);
-               //termination of particular instance will be handled by 
autoscaler
-       }
-
-       public static void handleMemberMaintenance(InstanceMaintenanceModeEvent 
instanceMaintenanceModeEvent)
-                       throws InvalidMemberException, 
InvalidCartridgeTypeException {
-               Topology topology = TopologyManager.getTopology();
-               Service service = 
topology.getService(instanceMaintenanceModeEvent.getServiceName());
-               //update the status of the member
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
instanceMaintenanceModeEvent.getServiceName()));
-                       return;
-               }
-
-               Cluster cluster = 
service.getCluster(instanceMaintenanceModeEvent.getClusterId());
-               if (cluster == null) {
-                       log.warn(String.format("Cluster %s does not exist", 
instanceMaintenanceModeEvent.getClusterId()));
-                       return;
-               }
-
-               Member member = 
cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
-               if (member == null) {
-                       log.warn(String.format("Member %s does not exist", 
instanceMaintenanceModeEvent.getMemberId()));
-                       return;
-               }
-
-               MemberMaintenanceModeEvent memberMaintenanceModeEvent =
-                               new 
MemberMaintenanceModeEvent(instanceMaintenanceModeEvent.getServiceName(),
-                                                              
instanceMaintenanceModeEvent.getClusterId(),
-                                                              
instanceMaintenanceModeEvent.getClusterInstanceId(),
-                                                              
instanceMaintenanceModeEvent.getMemberId(),
-                                                              
instanceMaintenanceModeEvent.getNetworkPartitionId(),
-                                                              
instanceMaintenanceModeEvent.getPartitionId());
-               try {
-                       TopologyManager.acquireWriteLock();
-                       // try update lifecycle state
-                       if 
(!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
-                               log.error("Invalid State Transition from " + 
member.getStatus() + " to " + MemberStatus.In_Maintenance);
-                               return;
-                       }
-                       member.setStatus(MemberStatus.In_Maintenance);
-                       log.info("member maintenance mode event adding status 
started");
-
-                       TopologyManager.updateTopology(topology);
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-               //publishing data
-               
TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
-
-       }
-
-       /**
-        * Remove member from topology and send member terminated event.
-        *
-        * @param serviceName
-        * @param clusterId
-        * @param networkPartitionId
-        * @param partitionId
-        * @param memberId
-        */
-       public static void handleMemberTerminated(String serviceName, String 
clusterId, String networkPartitionId,
-                                                 String partitionId, String 
memberId) {
-               Topology topology = TopologyManager.getTopology();
-               Service service = topology.getService(serviceName);
-               Properties properties;
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
serviceName));
-                       return;
-               }
-               Cluster cluster = service.getCluster(clusterId);
-               if (cluster == null) {
-                       log.warn(String.format("Cluster %s does not exist", 
clusterId));
-                       return;
-               }
-
-               Member member = cluster.getMember(memberId);
-               if (member == null) {
-                       log.warn(String.format("Member %s does not exist", 
memberId));
-                       return;
-               }
-
-               String clusterInstanceId = member.getClusterInstanceId();
-
-               try {
-                       TopologyManager.acquireWriteLock();
-                       properties = member.getProperties();
-                       cluster.removeMember(member);
-                       TopologyManager.updateTopology(topology);
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-               /* @TODO leftover from grouping_poc*/
-               String groupAlias = null;
-               TopologyEventPublisher
-                               .sendMemberTerminatedEvent(serviceName, 
clusterId, memberId, clusterInstanceId, networkPartitionId,
-                                                          partitionId, 
properties, groupAlias);
-       }
-
-       public static void handleMemberSuspended() {
-               //TODO
-               try {
-                       TopologyManager.acquireWriteLock();
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-       }
-
-       public static void handleClusterActivatedEvent(
-                       ClusterStatusClusterActivatedEvent 
clusterStatusClusterActivatedEvent) {
-
-               Topology topology = TopologyManager.getTopology();
-               Service service = 
topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
-               //update the status of the cluster
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
clusterStatusClusterActivatedEvent.getServiceName()));
-                       return;
-               }
-
-               Cluster cluster = 
service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
-               if (cluster == null) {
-                       log.warn(String.format("Cluster %s does not exist", 
clusterStatusClusterActivatedEvent.getClusterId()));
-                       return;
-               }
-
-               String clusterId = cluster.getClusterId();
-               ClusterContext clusterContext = 
CloudControllerContext.getInstance().getClusterContext(clusterId);
-               if (clusterContext == null) {
-                       log.warn("Cluster context not found: [cluster-id] " + 
clusterId);
-                       return;
-               }
-
-               ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
-                               new 
ClusterInstanceActivatedEvent(clusterStatusClusterActivatedEvent.getAppId(),
-                                                                 
clusterStatusClusterActivatedEvent.getServiceName(),
-                                                                 
clusterStatusClusterActivatedEvent.getClusterId(),
-                                                                 
clusterStatusClusterActivatedEvent.getInstanceId());
-               try {
-                       TopologyManager.acquireWriteLock();
-                       List<KubernetesService> kubernetesServices = 
clusterContext.getKubernetesServices();
-                       cluster.setKubernetesServices(kubernetesServices);
-                       
clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
-
-                       ClusterInstance context = 
cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
-                       if (context == null) {
-                               log.warn("Cluster instance context is not found 
for [cluster] " +
-                                        
clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
-                                        
clusterStatusClusterActivatedEvent.getInstanceId());
-                               return;
-                       }
-                       ClusterStatus status = ClusterStatus.Active;
-                       if (context.isStateTransitionValid(status)) {
-                               context.setStatus(status);
-                               log.info("Cluster activated adding status 
started for " + cluster.getClusterId());
-                               TopologyManager.updateTopology(topology);
-                               // publish event
-                               
TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
-                       } else {
-                               log.error(String.format("Cluster state 
transition is not valid: [cluster-id] %s " +
-                                                       " [instance-id] %s 
[current-status] %s [status-requested] %s",
-                                                       
clusterStatusClusterActivatedEvent.getClusterId(),
-                                                       
clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(),
-                                                       status));
-                               return;
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-
-       }
-
-       public static void 
handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent 
clusterInactivateEvent) {
-               Topology topology = TopologyManager.getTopology();
-               Service service = 
topology.getService(clusterInactivateEvent.getServiceName());
-               //update the status of the cluster
-               if (service == null) {
-                       log.warn(String.format("Service %s does not exist", 
clusterInactivateEvent.getServiceName()));
-                       return;
-               }
-
-               Cluster cluster = 
service.getCluster(clusterInactivateEvent.getClusterId());
-               if (cluster == null) {
-                       log.warn(String.format("Cluster %s does not exist", 
clusterInactivateEvent.getClusterId()));
-                       return;
-               }
-
-               ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
-                               new 
ClusterInstanceInactivateEvent(clusterInactivateEvent.getAppId(),
-                                                                  
clusterInactivateEvent.getServiceName(),
-                                                                  
clusterInactivateEvent.getClusterId(),
-                                                                  
clusterInactivateEvent.getInstanceId());
-               try {
-                       TopologyManager.acquireWriteLock();
-                       ClusterInstance context = 
cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
-                       if (context == null) {
-                               log.warn("Cluster Instance Context is not found 
for [cluster] " +
-                                        clusterInactivateEvent.getClusterId() 
+ " [instance-id] " +
-                                        
clusterInactivateEvent.getInstanceId());
-                               return;
-                       }
-                       ClusterStatus status = ClusterStatus.Inactive;
-                       if (context.isStateTransitionValid(status)) {
-                               context.setStatus(status);
-                               log.info("Cluster Inactive adding status 
started for" + cluster.getClusterId());
-                               TopologyManager.updateTopology(topology);
-                               //publishing data
-                               
TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
-                       } else {
-                               log.error(String.format("Cluster state 
transition is not valid: [cluster-id] %s " +
-                                                       " [instance-id] %s 
[current-status] %s [status-requested] %s",
-                                                       
clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
-                                                       context.getStatus(), 
status));
-                               return;
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-       }
-
-       private static void 
deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) 
{
-               try {
-                       MetaDataServiceClient metadataClient = new 
DefaultMetaDataServiceClient();
-                       
metadataClient.deleteApplicationProperties(event.getAppId());
-               } catch (Exception e) {
-                       log.error("Error occurred while deleting the 
application resources frm metadata service ", e);
-               }
-       }
-
-       public static void 
handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
-
-               TopologyManager.acquireWriteLock();
-
-               try {
-                       Topology topology = TopologyManager.getTopology();
-                       Service service = 
topology.getService(event.getServiceName());
-
-                       //update the status of the cluster
-                       if (service == null) {
-                               log.warn(String.format("Service %s does not 
exist", event.getServiceName()));
-                               return;
-                       }
-
-                       Cluster cluster = 
service.getCluster(event.getClusterId());
-                       if (cluster == null) {
-                               log.warn(String.format("Cluster %s does not 
exist", event.getClusterId()));
-                               return;
-                       }
-
-                       ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
-                       if (context == null) {
-                               log.warn("Cluster Instance Context is not found 
for [cluster] " +
-                                        event.getClusterId() + " [instance-id] 
" +
-                                        event.getInstanceId());
-                               return;
-                       }
-                       ClusterStatus status = ClusterStatus.Terminated;
-                       if (context.isStateTransitionValid(status)) {
-                               context.setStatus(status);
-                               log.info("Cluster Terminated adding status 
started for and removing the cluster instance" +
-                                        cluster.getClusterId());
-                               
cluster.removeInstanceContext(event.getInstanceId());
-                               TopologyManager.updateTopology(topology);
-                               //publishing data
-                               ClusterInstanceTerminatedEvent 
clusterTerminatedEvent =
-                                               new 
ClusterInstanceTerminatedEvent(event.getAppId(), event.getServiceName(),
-                                                                               
   event.getClusterId(), event.getInstanceId());
-
-                               
TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
-                       } else {
-                               log.error(String.format("Cluster state 
transition is not valid: [cluster-id] %s " +
-                                                       " [instance-id] %s 
[current-status] %s [status-requested] %s",
-                                                       event.getClusterId(), 
event.getInstanceId(), context.getStatus(), status));
-                               return;
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-
-       }
-
-       public static void 
handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
-
-               TopologyManager.acquireWriteLock();
-
-               try {
-                       Topology topology = TopologyManager.getTopology();
-                       Cluster cluster = 
topology.getService(event.getServiceName()).
-                                       getCluster(event.getClusterId());
-
-                       if 
(!cluster.isStateTransitionValid(ClusterStatus.Terminating, 
event.getInstanceId())) {
-                               log.error("Invalid state transfer from " + 
cluster.getStatus(event.getInstanceId()) + " to " +
-                                         ClusterStatus.Terminating);
-                       }
-                       ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
-                       if (context == null) {
-                               log.warn("Cluster Instance Context is not found 
for [cluster] " +
-                                        event.getClusterId() + " [instance-id] 
" +
-                                        event.getInstanceId());
-                               return;
-                       }
-                       ClusterStatus status = ClusterStatus.Terminating;
-                       if (context.isStateTransitionValid(status)) {
-                               context.setStatus(status);
-                               log.info("Cluster Terminating started for " + 
cluster.getClusterId());
-                               TopologyManager.updateTopology(topology);
-                               //publishing data
-                               ClusterInstanceTerminatingEvent 
clusterTerminaingEvent =
-                                               new 
ClusterInstanceTerminatingEvent(event.getAppId(), event.getServiceName(),
-                                                                               
    event.getClusterId(), event.getInstanceId());
-
-                               
TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
-
-                               // Remove kubernetes services if available
-                               ClusterContext clusterContext =
-                                               
CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
-                               if 
(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
-                                       
KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
-                               }
-                       } else {
-                               log.error(String.format("Cluster state 
transition is not valid: [cluster-id] %s " +
-                                                       " [instance-id] %s 
[current-status] %s [status-requested] %s",
-                                                       event.getClusterId(), 
event.getInstanceId(), context.getStatus(), status));
-                       }
-               } finally {
-                       TopologyManager.releaseWriteLock();
-               }
-       }
+    private static final Log log = LogFactory.getLog(TopologyBuilder.class);
+
+    public static void handleServiceCreated(List<Cartridge> cartridgeList) {
+        Service service;
+        Topology topology = TopologyManager.getTopology();
+        if (cartridgeList == null) {
+            log.warn(String.format("Cartridge list is empty"));
+            return;
+        }
+
+        try {
+
+            TopologyManager.acquireWriteLock();
+            for (Cartridge cartridge : cartridgeList) {
+                if (!topology.serviceExists(cartridge.getType())) {
+                    ServiceType serviceType =
+                            cartridge.isMultiTenant() ? 
ServiceType.MultiTenant : ServiceType.SingleTenant;
+                    service = new Service(cartridge.getType(), serviceType);
+                    Properties properties = new Properties();
+
+                    try {
+                        Property[] propertyArray = null;
+
+                        if (cartridge.getProperties() != null) {
+                            if (cartridge.getProperties().getProperties() != 
null) {
+                                propertyArray = 
cartridge.getProperties().getProperties();
+                            }
+                        }
+
+                        List<Property> propertyList = new 
ArrayList<Property>();
+                        if (propertyArray != null) {
+                            propertyList = Arrays.asList(propertyArray);
+                            if (propertyList != null) {
+                                for (Property property : propertyList) {
+                                    properties.setProperty(property.getName(), 
property.getValue());
+                                }
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.error(e);
+                    }
+
+                    service.setProperties(properties);
+                    if (cartridge.getPortMappings() != null) {
+                        List<PortMapping> portMappings = 
Arrays.asList(cartridge.getPortMappings());
+                        Port port;
+                        //adding ports to the event
+                        for (PortMapping portMapping : portMappings) {
+                            port = new Port(portMapping.getProtocol(), 
portMapping.getPort(),
+                                    portMapping.getProxyPort());
+                            service.addPort(port);
+                        }
+                    }
+
+                    topology.addService(service);
+                    TopologyManager.updateTopology(topology);
+                }
+            }
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+        TopologyEventPublisher.sendServiceCreateEvent(cartridgeList);
+    }
+
+    public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
+        Topology topology = TopologyManager.getTopology();
+
+        for (Cartridge cartridge : cartridgeList) {
+            Service service = topology.getService(cartridge.getType());
+            if (service == null) {
+                log.warn("Cartridge does not exist [cartidge] " + cartridge);
+                return;
+            }
+            if (service.getClusters().size() == 0) {
+                if (topology.serviceExists(cartridge.getType())) {
+                    try {
+                        TopologyManager.acquireWriteLock();
+                        topology.removeService(cartridge.getType());
+                        TopologyManager.updateTopology(topology);
+                    } finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                    
TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
+                } else {
+                    log.warn(String.format("Service %s does not exist..", 
cartridge.getType()));
+                }
+            } else {
+                log.warn("Subscription already exists. Hence not removing the 
service:" + cartridge.getType() +
+                        " from the topology");
+            }
+        }
+    }
+
+    public static void handleClusterCreated(ClusterStatusClusterCreatedEvent 
event) {
+        TopologyManager.acquireWriteLock();
+        Cluster cluster;
+
+        try {
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                log.error("Service " + event.getServiceName() +
+                        " not found in Topology, unable to update the cluster 
status to Created");
+                return;
+            }
+
+            if (service.clusterExists(event.getClusterId())) {
+                log.warn("Cluster " + event.getClusterId() + " is already in 
the Topology ");
+                return;
+            } else {
+                cluster = new Cluster(event.getServiceName(), 
event.getClusterId(), event.getDeploymentPolicyName(),
+                        event.getAutosScalePolicyName(), event.getAppId());
+                //cluster.setStatus(Status.Created);
+                cluster.setHostNames(event.getHostNames());
+                cluster.setTenantRange(event.getTenantRange());
+                service.addCluster(cluster);
+                TopologyManager.updateTopology(topology);
+            }
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+        TopologyEventPublisher.sendClusterCreatedEvent(cluster);
+    }
+
+    public static void handleApplicationClustersCreated(String appId, 
List<Cluster> appClusters) {
+
+        TopologyManager.acquireWriteLock();
+
+        try {
+            Topology topology = TopologyManager.getTopology();
+            for (Cluster cluster : appClusters) {
+                Service service = 
topology.getService(cluster.getServiceName());
+                if (service == null) {
+                    log.error("Service " + cluster.getServiceName() +
+                            " not found in Topology, unable to create 
Application cluster");
+                } else {
+                    service.addCluster(cluster);
+                    log.info("Application Cluster " + cluster.getClusterId() + 
" created in CC topology");
+                }
+            }
+            TopologyManager.updateTopology(topology);
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+        log.debug("Creating cluster port mappings: [appication-id] " + appId);
+        for (Cluster cluster : appClusters) {
+            String cartridgeType = cluster.getServiceName();
+            Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeType);
+            if (cartridge == null) {
+                throw new CloudControllerException("Cartridge not found: 
[cartridge-type] " + cartridgeType);
+            }
+
+            for (PortMapping portMapping : cartridge.getPortMappings()) {
+                ClusterPortMapping clusterPortMapping =
+                        new ClusterPortMapping(appId, cluster.getClusterId(), 
portMapping.getName(),
+                                portMapping.getProtocol(), 
portMapping.getPort(),
+                                portMapping.getProxyPort());
+                
CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
+                log.debug("Cluster port mapping created: " + 
clusterPortMapping.toString());
+            }
+        }
+
+        // Persist cluster port mappings
+        CloudControllerContext.getInstance().persist();
+
+        // Send application clusters created event
+        TopologyEventPublisher.sendApplicationClustersCreated(appId, 
appClusters);
+    }
+
+    public static void handleApplicationClustersRemoved(String appId, 
Set<ClusterDataHolder> clusterData) {
+        TopologyManager.acquireWriteLock();
+
+        List<Cluster> removedClusters = new ArrayList<Cluster>();
+        CloudControllerContext context = CloudControllerContext.getInstance();
+        try {
+            Topology topology = TopologyManager.getTopology();
+
+            if (clusterData != null) {
+                // remove clusters from CC topology model and remove runtime 
information
+                for (ClusterDataHolder aClusterData : clusterData) {
+                    Service aService = 
topology.getService(aClusterData.getServiceType());
+                    if (aService != null) {
+                        
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
+                    } else {
+                        log.warn("Service " + aClusterData.getServiceType() + 
" not found, " +
+                                "unable to remove Cluster " + 
aClusterData.getClusterId());
+                    }
+                    // remove runtime data
+                    context.removeClusterContext(aClusterData.getClusterId());
+
+                    log.info("Removed application [ " + appId + " ]'s Cluster 
" +
+                            "[ " + aClusterData.getClusterId() + " ] from the 
topology");
+                }
+                // persist runtime data changes
+                CloudControllerContext.getInstance().persist();
+            } else {
+                log.info("No cluster data found for application " + appId + " 
to remove");
+            }
+
+            TopologyManager.updateTopology(topology);
+
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+        // Remove cluster port mappings of application
+        CloudControllerContext.getInstance().removeClusterPortMappings(appId);
+        CloudControllerContext.getInstance().persist();
+
+        TopologyEventPublisher.sendApplicationClustersRemoved(appId, 
clusterData);
+
+    }
+
+    public static void handleClusterReset(ClusterStatusClusterResetEvent 
event) {
+        TopologyManager.acquireWriteLock();
+
+        try {
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                log.error("Service " + event.getServiceName() +
+                        " not found in Topology, unable to update the cluster 
status to Created");
+                return;
+            }
+
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                log.error("Cluster " + event.getClusterId() + " not found in 
Topology, unable to update " +
+                        "status to Created");
+                return;
+            }
+
+            ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
+            if (context == null) {
+                log.warn("Cluster Instance Context is not found for [cluster] 
" +
+                        event.getClusterId() + " [instance-id] " +
+                        event.getInstanceId());
+                return;
+            }
+            ClusterStatus status = ClusterStatus.Created;
+            if (context.isStateTransitionValid(status)) {
+                context.setStatus(status);
+                log.info("Cluster Created adding status started for" + 
cluster.getClusterId());
+                TopologyManager.updateTopology(topology);
+                //publishing data
+                TopologyEventPublisher
+                        .sendClusterResetEvent(event.getAppId(), 
event.getServiceName(), event.getClusterId(),
+                                event.getInstanceId());
+            } else {
+                log.warn(String.format("Cluster state transition is not valid: 
[cluster-id] %s " +
+                                " [instance-id] %s [current-status] %s 
[status-requested] %s",
+                        event.getClusterId(), event.getInstanceId(), 
context.getStatus(), status));
+            }
+
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+    }
+
+    public static void handleClusterInstanceCreated(String serviceType, String 
clusterId, String alias,
+                                                    String instanceId, String 
partitionId, String networkPartitionId) {
+
+        TopologyManager.acquireWriteLock();
+
+        try {
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(serviceType);
+            if (service == null) {
+                log.error("Service " + serviceType +
+                        " not found in Topology, unable to update the cluster 
status to Created");
+                return;
+            }
+
+            Cluster cluster = service.getCluster(clusterId);
+            if (cluster == null) {
+                log.error("Cluster " + clusterId + " not found in Topology, 
unable to update " +
+                        "status to Created");
+                return;
+            }
+
+            if (cluster.getInstanceContexts(instanceId) != null) {
+                log.warn("The Instance context for the cluster already exists 
for [cluster] " +
+                        clusterId + " [instance-id] " + instanceId);
+                return;
+            }
+
+            ClusterInstance clusterInstance = new ClusterInstance(alias, 
clusterId, instanceId);
+            clusterInstance.setNetworkPartitionId(networkPartitionId);
+            clusterInstance.setPartitionId(partitionId);
+            cluster.addInstanceContext(instanceId, clusterInstance);
+            TopologyManager.updateTopology(topology);
+
+            ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+                    new ClusterInstanceCreatedEvent(serviceType, clusterId, 
clusterInstance);
+            clusterInstanceCreatedEvent.setPartitionId(partitionId);
+            
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
+
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+    }
+
+    public static void handleClusterRemoved(ClusterContext ctxt) {
+        Topology topology = TopologyManager.getTopology();
+        Service service = topology.getService(ctxt.getCartridgeType());
+        String deploymentPolicy;
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", 
ctxt.getCartridgeType()));
+            return;
+        }
+
+        if (!service.clusterExists(ctxt.getClusterId())) {
+            log.warn(String.format("Cluster %s does not exist for service %s", 
ctxt.getClusterId(),
+                    ctxt.getCartridgeType()));
+            return;
+        }
+
+        try {
+            TopologyManager.acquireWriteLock();
+            Cluster cluster = service.removeCluster(ctxt.getClusterId());
+            deploymentPolicy = cluster.getDeploymentPolicyName();
+            TopologyManager.updateTopology(topology);
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+        TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
+    }
+
+    /**
+     * Add member object to the topology and publish member created event
+     *
+     * @param memberContext
+     */
+    public static void handleMemberCreatedEvent(MemberContext memberContext) {
+        Topology topology = TopologyManager.getTopology();
+
+        Service service = 
topology.getService(memberContext.getCartridgeType());
+        String clusterId = memberContext.getClusterId();
+        Cluster cluster = service.getCluster(clusterId);
+        String memberId = memberContext.getMemberId();
+        String clusterInstanceId = memberContext.getClusterInstanceId();
+        String networkPartitionId = memberContext.getNetworkPartitionId();
+        String partitionId = memberContext.getPartition().getId();
+        String lbClusterId = memberContext.getLbClusterId();
+        long initTime = memberContext.getInitTime();
+        String autoscalingReason =
+                
memberContext.getProperties().getProperty(StratosConstants.SCALING_REASON).getValue();
+        long scalingTime =
+                
Long.parseLong(memberContext.getProperties().getProperty(StratosConstants.SCALING_TIME).getValue());
+
+        if (cluster.memberExists(memberId)) {
+            log.warn(String.format("Member %s already exists", memberId));
+            return;
+        }
+
+        try {
+            TopologyManager.acquireWriteLock();
+            Member member =
+                    new Member(service.getServiceName(), clusterId, memberId, 
clusterInstanceId, networkPartitionId,
+                            partitionId, 
memberContext.getLoadBalancingIPType(), initTime);
+            member.setStatus(MemberStatus.Created);
+            member.setLbClusterId(lbClusterId);
+            
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
+            cluster.addMember(member);
+            TopologyManager.updateTopology(topology);
+            //Member created time
+            Long timeStamp = System.currentTimeMillis();
+            //publishing to BAM
+            BAMUsageDataPublisher.publish(memberContext.getMemberId(), 
memberContext.getPartition().getId(),
+                    memberContext.getNetworkPartitionId(), 
memberContext.getClusterId(),
+                    memberContext.getClusterInstanceId(), 
memberContext.getCartridgeType(),
+                    MemberStatus.Created.toString(), timeStamp, 
autoscalingReason, scalingTime,
+                    null);
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+        TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
+    }
+
+    /**
+     * Update member status to initialized and publish member initialized event
+     *
+     * @param memberContext
+     */
+    public static void handleMemberInitializedEvent(MemberContext 
memberContext) {
+        Topology topology = TopologyManager.getTopology();
+        Service service = 
topology.getService(memberContext.getCartridgeType());
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", 
memberContext.getCartridgeType()));
+            return;
+        }
+        if (!service.clusterExists(memberContext.getClusterId())) {
+            log.warn(String.format("Cluster %s does not exist in service %s", 
memberContext.getClusterId(),
+                    memberContext.getCartridgeType()));
+            return;
+        }
+
+        Member member = service.getCluster(memberContext.getClusterId()).
+                getMember(memberContext.getMemberId());
+        if (member == null) {
+            log.warn(String.format("Member %s does not exist", 
memberContext.getMemberId()));
+            return;
+        }
+
+        try {
+            TopologyManager.acquireWriteLock();
+
+            // Set ip addresses
+            member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
+            if (memberContext.getPrivateIPs() != null) {
+                
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
+            }
+            member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
+            if (memberContext.getPublicIPs() != null) {
+                
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+            }
+
+            // try update lifecycle state
+            if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
+                log.error("Invalid state transition from " + 
member.getStatus() + " to " +
+                        MemberStatus.Initialized);
+                return;
+            } else {
+                member.setStatus(MemberStatus.Initialized);
+                log.info("Member status updated to initialized");
+
+                TopologyManager.updateTopology(topology);
+                //member initialized time
+                Long timeStamp = System.currentTimeMillis();
+
+                
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
+                //publishing data
+                BAMUsageDataPublisher.publish(memberContext.getMemberId(), 
memberContext.getPartition().getId(),
+                        memberContext.getNetworkPartitionId(), 
memberContext.getClusterId(),
+                        memberContext.getClusterInstanceId(), 
memberContext.getCartridgeType(),
+                        MemberStatus.Initialized.toString(), timeStamp, null, 
null, null);
+            }
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+    }
+
+    private static int findKubernetesServicePort(String clusterId, 
List<KubernetesService> kubernetesServices,
+                                                 PortMapping portMapping) {
+        for (KubernetesService kubernetesService : kubernetesServices) {
+            if 
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
+                return kubernetesService.getPort();
+            }
+        }
+        throw new RuntimeException(
+                "Kubernetes service port not found: [cluster-id] " + clusterId 
+ " [port] " + portMapping.getPort());
+    }
+
+    public static void handleMemberStarted(InstanceStartedEvent 
instanceStartedEvent) {
+        try {
+            Topology topology = TopologyManager.getTopology();
+            Service service = 
topology.getService(instanceStartedEvent.getServiceName());
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist", 
instanceStartedEvent.getServiceName()));
+                return;
+            }
+            if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
+                log.warn(String.format("Cluster %s does not exist in service 
%s", instanceStartedEvent.getClusterId(),
+                        instanceStartedEvent.getServiceName()));
+                return;
+            }
+
+            Cluster cluster = 
service.getCluster(instanceStartedEvent.getClusterId());
+            Member member = 
cluster.getMember(instanceStartedEvent.getMemberId());
+            if (member == null) {
+                log.warn(String.format("Member %s does not exist", 
instanceStartedEvent.getMemberId()));
+                return;
+            }
+
+            try {
+                TopologyManager.acquireWriteLock();
+                // try update lifecycle state
+                if (!member.isStateTransitionValid(MemberStatus.Starting)) {
+                    log.error("Invalid State Transition from " + 
member.getStatus() + " to " +
+                            MemberStatus.Starting);
+                    return;
+                } else {
+                    member.setStatus(MemberStatus.Starting);
+                    log.info("member started event adding status started");
+
+                    TopologyManager.updateTopology(topology);
+                    //member starting time
+                    Long timeStamp = System.currentTimeMillis();
+                    //memberStartedEvent.
+                    
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+                    //publishing data
+                    BAMUsageDataPublisher
+                            .publish(instanceStartedEvent.getMemberId(), 
instanceStartedEvent.getPartitionId(),
+                                    
instanceStartedEvent.getNetworkPartitionId(), 
instanceStartedEvent.getClusterId(),
+                                    
instanceStartedEvent.getClusterInstanceId(), 
instanceStartedEvent.getServiceName(),
+                                    MemberStatus.Starting.toString(), 
timeStamp, null, null, null);
+                }
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
+        } catch (Exception e) {
+            String message = String.format(
+                    "Could not handle member started event: [application-id] 
%s " + "[service-name] %s [member-id] %s",
+                    instanceStartedEvent.getApplicationId(), 
instanceStartedEvent.getServiceName(),
+                    instanceStartedEvent.getMemberId());
+            log.warn(message, e);
+        }
+    }
+
+    public static void handleMemberActivated(InstanceActivatedEvent 
instanceActivatedEvent) {
+        Topology topology = TopologyManager.getTopology();
+        Service service = 
topology.getService(instanceActivatedEvent.getServiceName());
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", 
instanceActivatedEvent.getServiceName()));
+            return;
+        }
+
+        Cluster cluster = 
service.getCluster(instanceActivatedEvent.getClusterId());
+        if (cluster == null) {
+            log.warn(String.format("Cluster %s does not exist", 
instanceActivatedEvent.getClusterId()));
+            return;
+        }
+
+        Member member = 
cluster.getMember(instanceActivatedEvent.getMemberId());
+        if (member == null) {
+            log.warn(String.format("Member %s does not exist", 
instanceActivatedEvent.getMemberId()));
+            return;
+        }
+
+        MemberActivatedEvent memberActivatedEvent =
+                new 
MemberActivatedEvent(instanceActivatedEvent.getServiceName(), 
instanceActivatedEvent.getClusterId(),
+                        instanceActivatedEvent.getClusterInstanceId(),
+                        instanceActivatedEvent.getMemberId(),
+                        instanceActivatedEvent.getNetworkPartitionId(),
+                        instanceActivatedEvent.getPartitionId());
+
+        // grouping - set grouid
+        //TODO
+        memberActivatedEvent.setApplicationId(null);
+        try {
+            TopologyManager.acquireWriteLock();
+            // try update lifecycle state
+            if (!member.isStateTransitionValid(MemberStatus.Active)) {
+                log.error(
+                        "Invalid state transition from [" + member.getStatus() 
+ "] to [" + MemberStatus.Active + "]");
+                return;
+            } else {
+                member.setStatus(MemberStatus.Active);
+
+                // Set member ports
+                try {
+                    Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(service.getServiceName());
+                    if (cartridge == null) {
+                        throw new RuntimeException(
+                                String.format("Cartridge not found: 
[cartridge-type] %s", service.getServiceName()));
+                    }
+
+                    Port port;
+                    int portValue;
+                    List<PortMapping> portMappings = 
Arrays.asList(cartridge.getPortMappings());
+                    String clusterId = cluster.getClusterId();
+                    ClusterContext clusterContext = 
CloudControllerContext.getInstance().getClusterContext(clusterId);
+                    List<KubernetesService> kubernetesServices = 
clusterContext.getKubernetesServices();
+
+                    for (PortMapping portMapping : portMappings) {
+                        if (kubernetesServices != null) {
+                            portValue = findKubernetesServicePort(clusterId, 
kubernetesServices, portMapping);
+                        } else {
+                            portValue = portMapping.getPort();
+                        }
+                        port = new Port(portMapping.getProtocol(), portValue, 
portMapping.getProxyPort());
+                        member.addPort(port);
+                        memberActivatedEvent.addPort(port);
+                    }
+                } catch (Exception e) {
+                    String message = String.format("Could not add member 
ports: [service-name] %s [member-id] %s",
+                            memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getMemberId());
+                    log.error(message, e);
+                }
+
+                // Set member ip addresses
+                
memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
+                
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
+                
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
+                
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
+                TopologyManager.updateTopology(topology);
+
+                //member activated time
+                Long timeStamp = System.currentTimeMillis();
+
+                // Publish member activated event
+                
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+
+                // Publish statistics data
+                
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), 
memberActivatedEvent.getPartitionId(),
+                        memberActivatedEvent.getNetworkPartitionId(),
+                        memberActivatedEvent.getClusterId(),
+                        memberActivatedEvent.getClusterInstanceId(),
+                        memberActivatedEvent.getServiceName(), 
MemberStatus.Active.toString(),
+                        timeStamp, null, null, null);
+            }
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+    }
+
+    public static void 
handleMemberReadyToShutdown(InstanceReadyToShutdownEvent 
instanceReadyToShutdownEvent)
+            throws InvalidMemberException, InvalidCartridgeTypeException {
+        Topology topology = TopologyManager.getTopology();
+        Service service = 
topology.getService(instanceReadyToShutdownEvent.getServiceName());
+        //update the status of the member
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", 
instanceReadyToShutdownEvent.getServiceName()));
+            return;
+        }
+
+        Cluster cluster = 
service.getCluster(instanceReadyToShutdownEvent.getClusterId());
+        if (cluster == null) {
+            log.warn(String.format("Cluster %s does not exist", 
instanceReadyToShutdownEvent.getClusterId()));
+            return;
+        }
+
+        Member member = 
cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
+        if (member == null) {
+            log.warn(String.format("Member %s does not exist", 
instanceReadyToShutdownEvent.getMemberId()));
+            return;
+        }
+        MemberReadyToShutdownEvent memberReadyToShutdownEvent =
+                new 
MemberReadyToShutdownEvent(instanceReadyToShutdownEvent.getServiceName(),
+                        instanceReadyToShutdownEvent.getClusterId(),
+                        instanceReadyToShutdownEvent.getClusterInstanceId(),
+                        instanceReadyToShutdownEvent.getMemberId(),
+                        instanceReadyToShutdownEvent.getNetworkPartitionId(),
+                        instanceReadyToShutdownEvent.getPartitionId());
+        //get the time of ReadyToShutdown state change
+        Long timeStamp = null;
+        try {
+            TopologyManager.acquireWriteLock();
+
+            if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
+                log.error("Invalid State Transition from " + 
member.getStatus() + " to " +
+                        MemberStatus.ReadyToShutDown);
+                return;
+            }
+            member.setStatus(MemberStatus.ReadyToShutDown);
+            log.info("Member Ready to shut down event adding status started");
+
+            TopologyManager.updateTopology(topology);
+
+            timeStamp = System.currentTimeMillis();
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+        
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+        //publishing data
+        BAMUsageDataPublisher
+                .publish(instanceReadyToShutdownEvent.getMemberId(), 
instanceReadyToShutdownEvent.getPartitionId(),
+                        instanceReadyToShutdownEvent.getNetworkPartitionId(),
+                        instanceReadyToShutdownEvent.getClusterId(),
+                        instanceReadyToShutdownEvent.getClusterInstanceId(),
+                        instanceReadyToShutdownEvent.getServiceName(), 
MemberStatus.ReadyToShutDown.toString(),
+                        timeStamp, null, null, null);
+        //termination of particular instance will be handled by autoscaler
+    }
+
+    public static void handleMemberMaintenance(InstanceMaintenanceModeEvent 
instanceMaintenanceModeEvent)
+            throws InvalidMemberException, InvalidCartridgeTypeException {
+        Topology topology = TopologyManager.getTopology();
+        Service service = 
topology.getService(instanceMaintenanceModeEvent.getServiceName());
+        //update the status of the member
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", 
instanceMaintenanceModeEvent.getServiceName()));
+            return;
+        }
+
+        Cluster cluster = 
service.getCluster(instanceMaintenanceModeEvent.getClusterId());
+        if (cluster == null) {
+            log.warn(String.format("Cluster %s does not exist", 
instanceMaintenanceModeEvent.getClusterId()));
+            return;
+        }
+
+        Member member = 
cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
+        if (member == null) {
+            log.warn(String.format("Member %s does not exist", 
instanceMaintenanceModeEvent.getMemberId()));
+            return;
+        }
+
+        MemberMaintenanceModeEvent memberMaintenanceModeEvent =
+                new 
MemberMaintenanceModeEvent(instanceMaintenanceModeEvent.getServiceName(),
+                        instanceMaintenanceModeEvent.getClusterId(),
+                        instanceMaintenanceModeEvent.getClusterInstanceId(),
+                        instanceMaintenanceModeEvent.getMemberId(),
+                        instanceMaintenanceModeEvent.getNetworkPartitionId(),
+                        instanceMaintenanceModeEvent.getPartitionId());
+        try {
+            TopologyManager.acquireWriteLock();
+            // try update lifecycle state
+            if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
+                log.error("Invalid State Transition from " + 
member.getStatus() + " to " + MemberStatus.In_Maintenance);
+                return;
+            }
+            member.setStatus(MemberStatus.In_Maintenance);
+            log.info("member maintenance mode event adding status started");
+
+            TopologyManager.updateTopology(topology);
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+        //publishing data
+        
TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
+
+    }
+
+    /**
+     * Remove member from topology and send member terminated event.
+     *
+     * @param serviceName
+     * @param clusterId
+     * @param networkPartitionId
+     * @param partitionId
+     * @param memberId
+     */
+    public static void handleMemberTerminated(String serviceName, String 
clusterId, String networkPartitionId,
+                                              String partitionId, String 
memberId) {
+        Topology topology = TopologyManager.getTopology();
+        Service service = topology.getService(serviceName);
+        Properties properties;
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", serviceName));
+            return;
+        }
+        Cluster cluster = service.getCluster(clusterId);
+        if (cluster == null) {
+            log.warn(String.format("Cluster %s does not exist", clusterId));
+            return;
+        }
+
+        Member member = cluster.getMember(memberId);
+        if (member == null) {
+            log.warn(String.format("Member %s does not exist", memberId));
+            return;
+        }
+
+        String clusterInstanceId = member.getClusterInstanceId();
+
+        try {
+            TopologyManager.acquireWriteLock();
+            properties = member.getProperties();
+            cluster.removeMember(member);
+            TopologyManager.updateTopology(topology);
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+        /* @TODO leftover from grouping_poc*/
+        String groupAlias = null;
+        TopologyEventPublisher
+                .sendMemberTerminatedEvent(serviceName, clusterId, memberId, 
clusterInstanceId, networkPartitionId,
+                        partitionId, properties, groupAlias);
+    }
+
+    public static void handleMemberSuspended() {
+        //TODO
+        try {
+            TopologyManager.acquireWriteLock();
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+    }
+
+    public static void handleClusterActivatedEvent(
+            ClusterStatusClusterActivatedEvent 
clusterStatusClusterActivatedEvent) {
+
+        Topology topology = TopologyManager.getTopology();
+        Service service = 
topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
+        //update the status of the cluster
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", 
clusterStatusClusterActivatedEvent.getServiceName()));
+            return;
+        }
+
+        Cluster cluster = 
service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
+        if (cluster == null) {
+            log.warn(String.format("Cluster %s does not exist", 
clusterStatusClusterActivatedEvent.getClusterId()));
+            return;
+        }
+
+        String clusterId = cluster.getClusterId();
+        ClusterContext clusterContext = 
CloudControllerContext.getInstance().getClusterContext(clusterId);
+        if (clusterContext == null) {
+            log.warn("Cluster context not found: [cluster-id] " + clusterId);
+            return;
+        }
+
+        ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
+                new 
ClusterInstanceActivatedEvent(clusterStatusClusterActivatedEvent.getAppId(),
+                        clusterStatusClusterActivatedEvent.getServiceName(),
+                        clusterStatusClusterActivatedEvent.getClusterId(),
+                        clusterStatusClusterActivatedEvent.getInstanceId());
+        try {
+            TopologyManager.acquireWriteLock();
+            List<KubernetesService> kubernetesServices = 
clusterContext.getKubernetesServices();
+            cluster.setKubernetesServices(kubernetesServices);
+            
clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
+
+            ClusterInstance context = 
cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
+            if (context == null) {
+                log.warn("Cluster instance context is not found for [cluster] 
" +
+                        clusterStatusClusterActivatedEvent.getClusterId() + " 
[instance-id] " +
+                        clusterStatusClusterActivatedEvent.getInstanceId());
+                return;
+            }
+            ClusterStatus status = ClusterStatus.Active;
+            if (context.isStateTransitionValid(status)) {
+                context.setStatus(status);
+                log.info("Cluster activated adding status started for " + 
cluster.getClusterId());
+                TopologyManager.updateTopology(topology);
+                // publish event
+                
TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
+            } else {
+                log.error(String.format("Cluster state transition is not 
valid: [cluster-id] %s " +
+                                " [instance-id] %s [current-status] %s 
[status-requested] %s",
+                        clusterStatusClusterActivatedEvent.getClusterId(),
+                        clusterStatusClusterActivatedEvent.getInstanceId(), 
context.getStatus(),
+                        status));
+                return;
+            }
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+    }
+
+    public static void 
handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent 
clusterInactivateEvent) {
+        Topology topology = TopologyManager.getTopology();
+        Service service = 
topology.getService(clusterInactivateEvent.getServiceName());
+        //update the status of the cluster
+        if (service == null) {
+            log.warn(String.format("Service %s does not exist", 
clusterInactivateEvent.getServiceName()));
+            return;
+        }
+
+        Cluster cluster = 
service.getCluster(clusterInactivateEvent.getClusterId());
+        if (cluster == null) {
+            log.warn(String.format("Cluster %s does not exist", 
clusterInactivateEvent.getClusterId()));
+            return;
+        }
+
+        ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
+                new 
ClusterInstanceInactivateEvent(clusterInactivateEvent.getAppId(),
+                        clusterInactivateEvent.getServiceName(),
+                        clusterInactivateEvent.getClusterId(),
+                        clusterInactivateEvent.getInstanceId());
+        try {
+            TopologyManager.acquireWriteLock();
+            ClusterInstance context = 
cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
+            if (context == null) {
+                log.warn("Cluster Instance Context is not found for [cluster] 
" +
+                        clusterInactivateEvent.getClusterId() + " 
[instance-id] " +
+                        clusterInactivateEvent.getInstanceId());
+                return;
+            }
+            ClusterStatus status = ClusterStatus.Inactive;
+            if (context.isStateTransitionValid(status)) {
+                context.setStatus(status);
+                log.info("Cluster Inactive adding status started for" + 
cluster.getClusterId());
+                TopologyManager.updateTopology(topology);
+                //publishing data
+                
TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
+            } else {
+                log.error(String.format("Cluster state transition is not 
valid: [cluster-id] %s " +
+                                " [instance-id] %s [current-status] %s 
[status-requested] %s",
+                        clusterInactivateEvent.getClusterId(), 
clusterInactivateEvent.getInstanceId(),
+                        context.getStatus(), status));
+                return;
+            }
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+    }
+
+    private static void 
deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) 
{
+        try {
+            MetaDataServiceClient metadataClient = new 
DefaultMetaDataServiceClient();
+            metadataClient.deleteApplicationProperties(event.getAppId());
+        } catch (Exception e) {
+            log.error("Error occurred while deleting the application resources 
frm metadata service ", e);
+        }
+    }
+
+    public static void 
handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
+
+        TopologyManager.acquireWriteLock();
+
+        try {
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(event.getServiceName());
+
+            //update the status of the cluster
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist", 
event.getServiceName()));
+                return;
+            }
+
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                log.warn(String.format("Cluster %s does not exist", 
event.getClusterId()));
+                return;
+            }
+
+            ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
+            if (context == null) {
+                log.warn("Cluster Instance Context is not found for [cluster] 
" +
+                        event.getClusterId() + " [instance-id] " +
+                        event.getInstanceId());
+                return;
+            }
+            ClusterStatus status = ClusterStatus.Terminated;
+            if (context.isStateTransitionValid(status)) {
+                context.setStatus(status);
+                log.info("Cluster Terminated adding status started for and 
removing the cluster instance" +
+                        cluster.getClusterId());
+                cluster.removeInstanceContext(event.getInstanceId());
+                TopologyManager.updateTopology(topology);
+                //publishing data
+                ClusterInstanceTerminatedEvent clusterTerminatedEvent =
+                        new ClusterInstanceTerminatedEvent(event.getAppId(), 
event.getServiceName(),
+                                event.getClusterId(), event.getInstanceId());
+
+                
TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
+            } else {
+                log.error(String.format("Cluster state transition is not 
valid: [cluster-id] %s " +
+                                " [instance-id] %s [current-status] %s 
[status-requested] %s",
+                        event.getClusterId(), event.getInstanceId(), 
context.getStatus(), status));
+                return;
+            }
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+    }
+
+    public static void 
handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
+
+        TopologyManager.acquireWriteLock();
+
+        try {
+            Topology topology = TopologyManager.getTopology();
+            Cluster cluster = topology.getService(event.getServiceName()).
+                    getCluster(event.getClusterId());
+
+            if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, 
event.getInstanceId())) {
+                log.error("Invalid state transfer from " + 
cluster.getStatus(event.getInstanceId()) + " to " +
+                        ClusterStatus.Terminating);
+            }
+            ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
+            if (context == null) {
+                log.warn("Cluster Instance Context is not found for [cluster] 
" +
+                        event.getClusterId() + " [instance-id] " +
+                        event.getInstanceId());
+                return;
+            }
+            ClusterStatus status = ClusterStatus.Terminating;
+            if (context.isStateTransitionValid(status)) {
+                context.setStatus(status);
+                log.info("Cluster Terminating started for " + 
cluster.getClusterId());
+                TopologyManager.updateTopology(topology);
+                //publishing data
+                ClusterInstanceTerminatingEvent clusterTerminaingEvent =
+                        new ClusterInstanceTerminatingEvent(event.getAppId(), 
event.getServiceName(),
+                                event.getClusterId(), event.getInstanceId());
+
+                
TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
+
+                // Remove kubernetes services if available
+                ClusterContext clusterContext =
+                        
CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
+                if 
(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
+                    KubernetesIaas.removeKubernetesServices(event.getAppId(

<TRUNCATED>

Reply via email to