http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
deleted file mode 100644
index 861728f..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
+++ /dev/null
@@ -1,2149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.impl;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.net.InetAddresses;
-import org.apache.commons.collections.ListUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.cloud.controller.concurrent.PartitionValidatorCallable;
-import org.apache.stratos.cloud.controller.concurrent.ScheduledThreadExecutor;
-import org.apache.stratos.cloud.controller.concurrent.ThreadExecutor;
-import org.apache.stratos.cloud.controller.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.exception.*;
-import 
org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKubernetesService;
-import 
org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController;
-import org.apache.stratos.cloud.controller.functions.PodToMemberContext;
-import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
-import org.apache.stratos.cloud.controller.interfaces.Iaas;
-import org.apache.stratos.cloud.controller.persist.Deserializer;
-import org.apache.stratos.cloud.controller.pojo.*;
-import org.apache.stratos.cloud.controller.pojo.Cartridge;
-import org.apache.stratos.cloud.controller.pojo.Dependencies;
-import 
org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisher;
-import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
-import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.topology.TopologyEventPublisher;
-import org.apache.stratos.cloud.controller.topology.TopologyManager;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.cloud.controller.util.PodActivationWatcher;
-import 
org.apache.stratos.cloud.controller.validate.interfaces.PartitionValidator;
-import org.apache.stratos.common.*;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.kubernetes.client.KubernetesApiClient;
-import 
org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException;
-import org.apache.stratos.kubernetes.client.model.Label;
-import org.apache.stratos.kubernetes.client.model.Pod;
-import org.apache.stratos.kubernetes.client.model.ReplicationController;
-import org.apache.stratos.kubernetes.client.model.Service;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.jclouds.compute.ComputeService;
-import org.jclouds.compute.domain.NodeMetadata;
-import org.jclouds.compute.domain.NodeMetadataBuilder;
-import org.jclouds.compute.domain.Template;
-import org.jclouds.rest.ResourceNotFoundException;
-import org.wso2.carbon.registry.core.exceptions.RegistryException;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-
-/**
- * Cloud Controller Service is responsible for starting up new server 
instances,
- * terminating already started instances, providing pending instance count etc.
- */
-public class CloudControllerServiceImpl implements CloudControllerService {
-
-       private static final Log LOG = 
LogFactory.getLog(CloudControllerServiceImpl.class);
-       public static final String IS_LOAD_BALANCER = "load.balancer";
-
-    private FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder
-            .getInstance();
-
-    public CloudControllerServiceImpl() {
-        // acquire serialized data from registry
-        acquireData();
-    }
-
-    private void acquireData() {
-
-        Object obj = RegistryManager.getInstance().retrieve();
-        if (obj != null) {
-            try {
-                Object dataObj = Deserializer
-                        .deserializeFromByteArray((byte[]) obj);
-                if (dataObj instanceof FasterLookUpDataHolder) {
-                    FasterLookUpDataHolder serializedObj = 
(FasterLookUpDataHolder) dataObj;
-                    FasterLookUpDataHolder currentData = FasterLookUpDataHolder
-                            .getInstance();
-
-                    // assign necessary data
-                    
currentData.setClusterIdToContext(serializedObj.getClusterIdToContext());
-                    
currentData.setMemberIdToContext(serializedObj.getMemberIdToContext());
-                    
currentData.setClusterIdToMemberContext(serializedObj.getClusterIdToMemberContext());
-                    currentData.setCartridges(serializedObj.getCartridges());
-                    
currentData.setKubClusterIdToKubClusterContext(serializedObj.getKubClusterIdToKubClusterContext());
-                    
currentData.setServiceGroups(serializedObj.getServiceGroups());
-
-                    if (LOG.isDebugEnabled()) {
-
-                        LOG.debug("Cloud Controller Data is retrieved from 
registry.");
-                    }
-                } else {
-                    if (LOG.isDebugEnabled()) {
-
-                        LOG.debug("Cloud Controller Data cannot be found in 
registry.");
-                    }
-                }
-            } catch (Exception e) {
-
-                String msg = "Unable to acquire data from Registry. Hence, any 
historical data will not get reflected.";
-                LOG.warn(msg, e);
-            }
-
-        }
-    }
-
-    public void deployCartridgeDefinition(CartridgeConfig cartridgeConfig) 
throws InvalidCartridgeDefinitionException,
-            InvalidIaasProviderException {
-
-        handleNullObject(cartridgeConfig, "Invalid Cartridge Definition: 
Definition is null.");
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Cartridge definition: " + cartridgeConfig.toString());
-        }
-
-        Cartridge cartridge = null;
-        try {
-            // cartridge can never be null
-            cartridge = CloudControllerUtil.toCartridge(cartridgeConfig);
-        } catch (Exception e) {
-            String msg =
-                    "Invalid Cartridge Definition: Cartridge Type: " +
-                            cartridgeConfig.getType() +
-                            ". Cause: Cannot instantiate a Cartridge Instance 
with the given Config. " + e.getMessage();
-            LOG.error(msg, e);
-            throw new InvalidCartridgeDefinitionException(msg, e);
-        }
-
-        List<IaasProvider> iaases = cartridge.getIaases();
-
-        if 
(!StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType()))
 {
-            if (iaases == null || iaases.isEmpty()) {
-                String msg = "Invalid Cartridge Definition: Cartridge Type: "
-                        + cartridgeConfig.getType()
-                        + ". Cause: Iaases of this Cartridge is null or 
empty.";
-                LOG.error(msg);
-                throw new InvalidCartridgeDefinitionException(msg);
-            }
-
-            if (iaases == null || iaases.isEmpty()) {
-                String msg = "Invalid Cartridge Definition: Cartridge Type: " +
-                        cartridgeConfig.getType() +
-                        ". Cause: Iaases of this Cartridge is null or empty.";
-                LOG.error(msg);
-                throw new InvalidCartridgeDefinitionException(msg);
-            }
-
-            for (IaasProvider iaasProvider : iaases) {
-                CloudControllerUtil.getIaas(iaasProvider);
-            }
-        }
-
-        // TODO transaction begins
-        String cartridgeType = cartridge.getType();
-        if (dataHolder.getCartridge(cartridgeType) != null) {
-            Cartridge cartridgeToBeRemoved = 
dataHolder.getCartridge(cartridgeType);
-            // undeploy
-            try {
-                undeployCartridgeDefinition(cartridgeToBeRemoved.getType());
-            } catch (InvalidCartridgeTypeException e) {
-                //ignore
-            }
-            populateNewCartridge(cartridge, cartridgeToBeRemoved);
-        }
-
-        dataHolder.addCartridge(cartridge);
-
-        // persist
-        persist();
-
-        List<Cartridge> cartridgeList = new ArrayList<Cartridge>();
-        cartridgeList.add(cartridge);
-
-        TopologyBuilder.handleServiceCreated(cartridgeList);
-        // transaction ends
-
-        LOG.info("Successfully deployed the Cartridge definition: " + 
cartridgeType);
-    }
-
-    private void populateNewCartridge(Cartridge cartridge,
-                                      Cartridge cartridgeToBeRemoved) {
-
-        List<IaasProvider> newIaasProviders = cartridge.getIaases();
-        Map<String, IaasProvider> oldPartitionToIaasMap = 
cartridgeToBeRemoved.getPartitionToIaasProvider();
-
-        for (Entry<String, IaasProvider> entry : 
oldPartitionToIaasMap.entrySet()) {
-            if (entry == null) {
-                continue;
-            }
-            String partitionId = entry.getKey();
-            IaasProvider oldIaasProvider = entry.getValue();
-            if (newIaasProviders.contains(oldIaasProvider)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Copying a partition from the Cartridge that is 
undeployed, to the new Cartridge. "
-                            + "[partition id] : " + partitionId + " [cartridge 
type] " + cartridge.getType());
-                }
-                cartridge.addIaasProvider(partitionId, 
newIaasProviders.get(newIaasProviders.indexOf(oldIaasProvider)));
-            }
-        }
-
-    }
-
-    public void undeployCartridgeDefinition(String cartridgeType) throws 
InvalidCartridgeTypeException {
-
-        Cartridge cartridge = null;
-        if ((cartridge = dataHolder.getCartridge(cartridgeType)) != null) {
-            if (dataHolder.getCartridges().remove(cartridge)) {
-                // invalidate partition validation cache
-                
dataHolder.removeFromCartridgeTypeToPartitionIds(cartridgeType);
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Partition cache invalidated for cartridge " + 
cartridgeType);
-                }
-
-                persist();
-
-                // sends the service removed event
-                List<Cartridge> cartridgeList = new ArrayList<Cartridge>();
-                cartridgeList.add(cartridge);
-                TopologyBuilder.handleServiceRemoved(cartridgeList);
-
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Successfully undeployed the Cartridge 
definition: " + cartridgeType);
-                }
-                return;
-            }
-        }
-        String msg = "Cartridge [type] " + cartridgeType + " is not a deployed 
Cartridge type.";
-        LOG.error(msg);
-        throw new InvalidCartridgeTypeException(msg);
-    }
-
-    public void deployServiceGroup(ServiceGroup servicegroup) throws 
InvalidServiceGroupException {
-
-        if (servicegroup == null) {
-            String msg = "Invalid ServiceGroup Definition: Definition is 
null.";
-            LOG.error(msg);
-            throw new IllegalArgumentException(msg);
-
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("CloudControllerServiceImpl:deployServiceGroup:" + 
servicegroup.getName());
-        }
-
-        String[] subGroups = servicegroup.getCartridges();
-
-
-        if (LOG.isDebugEnabled()) {
-            
LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups" + 
subGroups);
-            if (subGroups != null) {
-                
LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups:size" + 
subGroups.length);
-            } else {
-                
LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups: is null");
-            }
-        }
-
-
-        Dependencies dependencies = servicegroup.getDependencies();
-
-        if (LOG.isDebugEnabled()) {
-            
LOG.debug("CloudControllerServiceImpl:deployServiceGroup:dependencies" + 
dependencies);
-        }
-
-        if (dependencies != null) {
-            String[] startupOrders = dependencies.getStartupOrders();
-
-            if (LOG.isDebugEnabled()) {
-                
LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrders" + 
startupOrders);
-
-                if (startupOrders != null) {
-                    
LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder:size" + 
startupOrders.length);
-                } else {
-                    
LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder: is 
null");
-                }
-            }
-        }
-
-        dataHolder.addServiceGroup(servicegroup);
-
-        this.persist();
-
-    }
-
-    public void undeployServiceGroup(String name) throws 
InvalidServiceGroupException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("CloudControllerServiceImpl:undeployServiceGroup: " + 
name);
-        }
-
-        ServiceGroup serviceGroup = null;
-
-        serviceGroup = dataHolder.getServiceGroup(name);
-
-        if (serviceGroup != null) {
-            if (dataHolder.getServiceGroups().remove(serviceGroup)) {
-                persist();
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Successfully undeployed the Service Group 
definition: " + serviceGroup);
-                }
-                return;
-            }
-        }
-
-        String msg = "ServiceGroup " + name + " is not a deployed Service 
Group definition";
-        LOG.error(msg);
-        throw new InvalidServiceGroupException(msg);
-
-    }
-
-    @Override
-    public ServiceGroup getServiceGroup(String name) throws 
InvalidServiceGroupException {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("getServiceGroupDefinition:" + name);
-        }
-
-        ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name);
-
-        if (serviceGroup == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getServiceGroupDefinition: no entry found for 
service group " + name);
-            }
-            String msg = "ServiceGroup " + name + " is not a deployed Service 
Group definition";
-            throw new InvalidServiceGroupException(msg);
-        }
-
-        return serviceGroup;
-    }
-
-    public String[] getServiceGroupSubGroups(String name) throws 
InvalidServiceGroupException {
-        ServiceGroup serviceGroup = this.getServiceGroup(name);
-        if (serviceGroup == null) {
-            throw new InvalidServiceGroupException("Invalid ServiceGroup " + 
serviceGroup);
-        }
-
-        return serviceGroup.getSubGroups();
-    }
-
-    /**
-     *
-     */
-    public String[] getServiceGroupCartridges(String name) throws 
InvalidServiceGroupException {
-        ServiceGroup serviceGroup = this.getServiceGroup(name);
-        if (serviceGroup == null) {
-            throw new InvalidServiceGroupException("Invalid ServiceGroup " + 
serviceGroup);
-        }
-        String[] cs = serviceGroup.getCartridges();
-        return cs;
-
-    }
-
-    public Dependencies getServiceGroupDependencies(String name) throws 
InvalidServiceGroupException {
-        ServiceGroup serviceGroup = this.getServiceGroup(name);
-        if (serviceGroup == null) {
-            throw new InvalidServiceGroupException("Invalid ServiceGroup " + 
serviceGroup);
-        }
-        return serviceGroup.getDependencies();
-    }
-
-    @Override
-    public MemberContext startInstance(MemberContext memberContext) throws
-            UnregisteredCartridgeException, InvalidIaasProviderException {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("CloudControllerServiceImpl:startInstance");
-        }
-
-        handleNullObject(memberContext, "Instance start-up failed. Member is 
null.");
-
-        String clusterId = memberContext.getClusterId();
-        Partition partition = memberContext.getPartition();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Received an instance spawn request : " + memberContext);
-        }
-
-        Template template = null;
-
-        handleNullObject(partition, "Instance start-up failed. Specified 
Partition is null. " +
-                memberContext);
-
-        String partitionId = partition.getId();
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
-
-        handleNullObject(ctxt, "Instance start-up failed. Invalid cluster id. 
" + memberContext);
-
-        String cartridgeType = ctxt.getCartridgeType();
-
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
-
-        if (cartridge == null) {
-            String msg =
-                    "Instance start-up failed. No matching Cartridge found 
[type] " + cartridgeType + ". " +
-                            memberContext.toString();
-            LOG.error(msg);
-            throw new UnregisteredCartridgeException(msg);
-        }
-
-        memberContext.setCartridgeType(cartridgeType);
-
-
-        IaasProvider iaasProvider = 
cartridge.getIaasProviderOfPartition(partitionId);
-        if (iaasProvider == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("IaasToPartitionMap " + cartridge.hashCode()
-                        + " for cartridge " + cartridgeType + " and for 
partition: " + partitionId);
-            }
-            String msg = "Instance start-up failed. "
-                    + "There's no IaaS provided for the partition: "
-                    + partitionId
-                    + " and for the Cartridge type: "
-                    + cartridgeType
-                    + ". Only following "
-                    + "partitions can be found in this Cartridge: "
-                    + cartridge.getPartitionToIaasProvider().keySet()
-                    .toString() + ". " + memberContext.toString()
-                    + ". ";
-            LOG.fatal(msg);
-            throw new InvalidIaasProviderException(msg);
-        }
-        String type = iaasProvider.getType();
-        try {
-            // generating the Unique member ID...
-            String memberID = generateMemberId(clusterId);
-            memberContext.setMemberId(memberID);
-            // have to add memberID to the payload
-            StringBuilder payload = new StringBuilder(ctxt.getPayload());
-            addToPayload(payload, "MEMBER_ID", memberID);
-            addToPayload(payload, "LB_CLUSTER_ID", 
memberContext.getLbClusterId());
-            addToPayload(payload, "NETWORK_PARTITION_ID", 
memberContext.getNetworkPartitionId());
-            addToPayload(payload, "PARTITION_ID", partitionId);
-            if (memberContext.getProperties() != null) {
-                org.apache.stratos.common.Properties properties = 
memberContext.getProperties();
-                if (properties != null) {
-                    for (Property prop : properties.getProperties()) {
-                        addToPayload(payload, prop.getName(), prop.getValue());
-                    }
-                }
-            }
-
-            Iaas iaas = iaasProvider.getIaas();
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Payload: " + payload.toString());
-            }
-
-            if (iaas == null) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Iaas is null of Iaas Provider: " + type + ". 
Trying to build IaaS...");
-                }
-                try {
-                    iaas = CloudControllerUtil.getIaas(iaasProvider);
-                } catch (InvalidIaasProviderException e) {
-                    String msg = "Instance start up failed. " + 
memberContext.toString() +
-                            "Unable to build Iaas of this IaasProvider 
[Provider] : " + type + ". Cause: " + e.getMessage();
-                    LOG.error(msg, e);
-                    throw new InvalidIaasProviderException(msg, e);
-                }
-
-            }
-
-            if (ctxt.isVolumeRequired()) {
-                if (ctxt.getVolumes() != null) {
-                    for (Volume volume : ctxt.getVolumes()) {
-
-                        if (volume.getId() == null) {
-                            // create a new volume
-                            createVolumeAndSetInClusterContext(volume, 
iaasProvider);
-                        }
-                    }
-                }
-            }
-
-            if (ctxt.isVolumeRequired()) {
-                addToPayload(payload, "PERSISTENCE_MAPPING", 
getPersistencePayload(ctxt, iaas).toString());
-            }
-            iaasProvider.setPayload(payload.toString().getBytes());
-            iaas.setDynamicPayload();
-
-            template = iaasProvider.getTemplate();
-
-            if (template == null) {
-                String msg =
-                        "Failed to start an instance. " +
-                                memberContext.toString() +
-                                ". Reason : Jclouds Template is null for iaas 
provider [type]: " + iaasProvider.getType();
-                LOG.error(msg);
-                throw new InvalidIaasProviderException(msg);
-            }
-
-            //Start instance start up in a new thread
-            ThreadExecutor exec = ThreadExecutor.getInstance();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller is starting the instance start up 
thread.");
-            }
-            exec.execute(new JcloudsInstanceCreator(memberContext, 
iaasProvider, cartridgeType));
-
-            LOG.info("Instance is successfully starting up. " + 
memberContext.toString());
-
-            return memberContext;
-
-        } catch (Exception e) {
-            String msg = "Failed to start an instance. " + 
memberContext.toString() + " Cause: " + e.getMessage();
-            LOG.error(msg, e);
-            throw new IllegalStateException(msg, e);
-        }
-
-    }
-
-    private void createVolumeAndSetInClusterContext(Volume volume,
-                                                    IaasProvider iaasProvider) 
{
-        // iaas cannot be null at this state #startInstance method
-        Iaas iaas = iaasProvider.getIaas();
-        int sizeGB = volume.getSize();
-        String snapshotId = volume.getSnapshotId();
-        if (StringUtils.isNotEmpty(volume.getVolumeId())) {
-            // volumeID is specified, so not creating additional volumes
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Volume creation is skipping since a volume ID is 
specified. [Volume ID]" + volume.getVolumeId());
-            }
-            volume.setId(volume.getVolumeId());
-        } else {
-            String volumeId = iaas.createVolume(sizeGB, snapshotId);
-            volume.setId(volumeId);
-        }
-
-        volume.setIaasType(iaasProvider.getType());
-    }
-
-
-    private StringBuilder getPersistencePayload(ClusterContext ctx, Iaas iaas) 
{
-        StringBuilder persistencePayload = new StringBuilder();
-        if (isPersistenceMappingAvailable(ctx)) {
-            for (Volume volume : ctx.getVolumes()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Adding persistence mapping " + 
volume.toString());
-                }
-                if (persistencePayload.length() != 0) {
-                    persistencePayload.append("|");
-                }
-
-                
persistencePayload.append(iaas.getIaasDevice(volume.getDevice()));
-                persistencePayload.append("|");
-                persistencePayload.append(volume.getId());
-                persistencePayload.append("|");
-                persistencePayload.append(volume.getMappingPath());
-            }
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Persistence payload is" + 
persistencePayload.toString());
-        }
-        return persistencePayload;
-    }
-
-    private boolean isPersistenceMappingAvailable(ClusterContext ctx) {
-        return ctx.getVolumes() != null && ctx.isVolumeRequired();
-    }
-
-    private void addToPayload(StringBuilder payload, String name, String 
value) {
-        payload.append(",");
-        payload.append(name + "=" + value);
-    }
-
-    /**
-     * Persist data in registry.
-     */
-    private void persist() {
-        try {
-            RegistryManager.getInstance().persist(
-                    dataHolder);
-        } catch (RegistryException e) {
-
-            String msg = "Failed to persist the Cloud Controller data in 
registry. Further, transaction roll back also failed.";
-            LOG.fatal(msg);
-            throw new CloudControllerException(msg, e);
-        }
-    }
-
-    private String generateMemberId(String clusterId) {
-        UUID memberId = UUID.randomUUID();
-        return clusterId + memberId.toString();
-    }
-
-    @Override
-    public void terminateInstance(String memberId) throws 
InvalidMemberException, InvalidCartridgeTypeException {
-
-        handleNullObject(memberId, "Termination failed. Null member id.");
-
-        MemberContext ctxt = dataHolder.getMemberContextOfMemberId(memberId);
-
-        if (ctxt == null) {
-            String msg = "Termination failed. Invalid Member Id: " + memberId;
-            LOG.error(msg);
-            throw new InvalidMemberException(msg);
-        }
-
-        if (ctxt.getNodeId() == null && ctxt.getInstanceId() == null) {
-            // sending member terminated since this instance isn't reachable.
-            if (LOG.isInfoEnabled()){
-                LOG.info(String.format(
-                        "Member cannot be terminated because it is not 
reachable. [member] %s [nodeId] %s [instanceId] %s. Removing member from 
topology.",
-                        ctxt.getMemberId(),
-                        ctxt.getNodeId(),
-                        ctxt.getInstanceId()));
-            }
-
-            logTermination(ctxt);
-        }
-
-        // check if status == active, if true, then this is a termination on 
member faulty
-        Topology topology;
-        try {
-            TopologyManager.acquireReadLock();
-            topology = TopologyManager.getTopology();
-        } finally {
-            TopologyManager.releaseReadLock();
-        }
-
-        org.apache.stratos.messaging.domain.topology.Service service = 
topology.getService(ctxt.getCartridgeType());
-
-        if (service != null) {
-            Cluster cluster = service.getCluster(ctxt.getClusterId());
-
-            if (cluster != null) {
-                Member member = cluster.getMember(memberId);
-
-                if (member != null) {
-                    // change member status if termination on a faulty member
-                    if(fixMemberStatus(member, topology)){
-                        // set the time this member was added to 
ReadyToShutdown status
-                        ctxt.setObsoleteInitTime(System.currentTimeMillis());
-                    }
-
-                    // check if ready to shutdown member is expired and send
-                    // member terminated if it is.
-                    if (isMemberExpired(member, ctxt.getObsoleteInitTime(), 
ctxt.getObsoleteExpiryTime())) {
-                        if (LOG.isInfoEnabled()) {
-                            LOG.info(String.format(
-                                    "Member pending termination in 
ReadyToShutdown state exceeded expiry time. This member has to be manually 
deleted: %s",
-                                    ctxt.getMemberId()));
-                        }
-
-                        logTermination(ctxt);
-                        return;
-                    }
-                }
-            }
-        }
-
-        ThreadExecutor exec = ThreadExecutor.getInstance();
-        exec.execute(new InstanceTerminator(ctxt));
-
-    }
-
-    /**
-     * Check if a member has been in the ReadyToShutdown status for a 
specified expiry time
-     *
-     * @param member
-     * @param initTime
-     * @param expiryTime
-     * @return
-     */
-    private boolean isMemberExpired(Member member, long initTime, long 
expiryTime) {
-        if (member.getStatus() == MemberStatus.ReadyToShutDown) {
-            if (initTime == 0){
-                // obsolete init time hasn't been set, i.e. not a member 
detected faulty.
-                // this is a graceful shutdown
-                return false;
-            }
-
-            // member detected faulty, calculate ready to shutdown waiting 
period
-            long timeInReadyToShutdownStatus = System.currentTimeMillis() - 
initTime;
-            return timeInReadyToShutdownStatus >= expiryTime;
-        }
-
-        return false;
-    }
-
-
-    /**
-     * Corrects the member status upon termination call if the member is in an 
Active state
-     *
-     * @param member The {@link 
org.apache.stratos.messaging.domain.topology.Member} object that is being
-     *               checked for status
-     * @param topology The {@link 
org.apache.stratos.messaging.domain.topology.Topology} object to update
-     *                 the topology if needed.
-     *
-     */
-    private boolean fixMemberStatus(Member member, Topology topology) {
-        if (member.getStatus() == MemberStatus.Activated) {
-            MemberReadyToShutdownEvent memberReadyToShutdownEvent = new 
MemberReadyToShutdownEvent(
-                    member.getServiceName(),
-                    member.getClusterId(),
-                    member.getNetworkPartitionId(),
-                    member.getPartitionId(),
-                    member.getMemberId(),
-                    member.getInstanceId());
-
-            try {
-                TopologyManager.acquireWriteLock();
-                member.setStatus(MemberStatus.ReadyToShutDown);
-                LOG.info("Member Ready to shut down event adding status 
started");
-
-                TopologyManager.updateTopology(topology);
-            } finally {
-                TopologyManager.releaseWriteLock();
-            }
-
-            
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-            //publishing data
-            CartridgeInstanceDataPublisher.publish(member.getMemberId(),
-                    member.getPartitionId(),
-                    member.getNetworkPartitionId(),
-                    member.getClusterId(),
-                    member.getServiceName(),
-                    MemberStatus.ReadyToShutDown.toString(),
-                    null);
-
-            return true;
-        }
-
-        return false;
-    }
-
-
-    private class InstanceTerminator implements Runnable {
-
-        private MemberContext ctxt;
-
-        public InstanceTerminator(MemberContext ctxt) {
-            this.ctxt = ctxt;
-        }
-
-        @Override
-        public void run() {
-
-            String memberId = ctxt.getMemberId();
-            String clusterId = ctxt.getClusterId();
-            String partitionId = ctxt.getPartition().getId();
-            String cartridgeType = ctxt.getCartridgeType();
-            String nodeId = ctxt.getNodeId();
-
-            try {
-                // these will never be null, since we do not add null values 
for these.
-                Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
-
-                LOG.info("Starting to terminate an instance with member id : " 
+ memberId +
-                        " in partition id: " + partitionId + " of cluster id: 
" + clusterId +
-                        " and of cartridge type: " + cartridgeType);
-
-                if (cartridge == null) {
-                    String msg =
-                            "Termination of Member Id: " + memberId + " 
failed. " +
-                                    "Cannot find a matching Cartridge for 
type: " +
-                                    cartridgeType;
-                    LOG.error(msg);
-                    throw new InvalidCartridgeTypeException(msg);
-                }
-
-                // if no matching node id can be found.
-                if (nodeId == null) {
-
-                    String msg =
-                            "Termination failed. Cannot find a node id for 
Member Id: " +
-                                    memberId;
-
-                    // log information
-                    logTermination(ctxt);
-                    LOG.error(msg);
-                    throw new InvalidMemberException(msg);
-                }
-
-                IaasProvider iaasProvider = 
cartridge.getIaasProviderOfPartition(partitionId);
-
-                // terminate it!
-                terminate(iaasProvider, nodeId, ctxt);
-
-                // log information
-                logTermination(ctxt);
-
-            } catch (Exception e) {
-                String msg =
-                        "Instance termination failed. " + ctxt.toString();
-                LOG.error(msg, e);
-                throw new CloudControllerException(msg, e);
-            }
-
-        }
-    }
-
-    private class JcloudsInstanceCreator implements Runnable {
-
-        private MemberContext memberContext;
-        private IaasProvider iaasProvider;
-        private String cartridgeType;
-
-        public JcloudsInstanceCreator(MemberContext memberContext, 
IaasProvider iaasProvider,
-                                      String cartridgeType) {
-            this.memberContext = memberContext;
-            this.iaasProvider = iaasProvider;
-            this.cartridgeType = cartridgeType;
-        }
-
-        @Override
-        public void run() {
-
-
-            String clusterId = memberContext.getClusterId();
-            Partition partition = memberContext.getPartition();
-            ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
-            Iaas iaas = iaasProvider.getIaas();
-            String publicIp = null;
-
-            NodeMetadata node = null;
-            // generate the group id from domain name and sub domain name.
-            // Should have lower-case ASCII letters, numbers, or dashes.
-            // Should have a length between 3-15
-            String str = clusterId.length() > 10 ? clusterId.substring(0, 10) 
: clusterId.substring(0, clusterId.length());
-            String group = str.replaceAll("[^a-z0-9-]", "");
-
-            try {
-                ComputeService computeService = iaasProvider
-                        .getComputeService();
-                Template template = iaasProvider.getTemplate();
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cloud Controller is delegating request to start 
an instance for "
-                            + memberContext + " to Jclouds layer.");
-                }
-                // create and start a node
-                Set<? extends NodeMetadata> nodes = computeService
-                        .createNodesInGroup(group, 1, template);
-                node = nodes.iterator().next();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cloud Controller received a response for the 
request to start "
-                            + memberContext + " from Jclouds layer.");
-                }
-
-                if (node == null) {
-                    String msg = "Null response received for instance start-up 
request to Jclouds.\n"
-                            + memberContext.toString();
-                    LOG.error(msg);
-                    throw new IllegalStateException(msg);
-                }
-
-                // node id
-                String nodeId = node.getId();
-                if (nodeId == null) {
-                    String msg = "Node id of the starting instance is null.\n"
-                            + memberContext.toString();
-                    LOG.fatal(msg);
-                    throw new IllegalStateException(msg);
-                }
-
-                memberContext.setNodeId(nodeId);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Node id was set. " + memberContext.toString());
-                }
-
-                // attach volumes
-                if (ctxt.isVolumeRequired()) {
-                    // remove region prefix
-                    String instanceId = nodeId.indexOf('/') != -1 ? nodeId
-                            .substring(nodeId.indexOf('/') + 1, 
nodeId.length())
-                            : nodeId;
-                    memberContext.setInstanceId(instanceId);
-                    if (ctxt.getVolumes() != null) {
-                        for (Volume volume : ctxt.getVolumes()) {
-                            try {
-                                iaas.attachVolume(instanceId, volume.getId(),
-                                        volume.getDevice());
-                            } catch (Exception e) {
-                                // continue without throwing an exception, 
since
-                                // there is an instance already running
-                                LOG.error("Attaching Volume to Instance [ "
-                                        + instanceId + " ] failed!", e);
-                            }
-                        }
-                    }
-                }
-
-            } catch (Exception e) {
-                String msg = "Failed to start an instance. " + 
memberContext.toString() + " Cause: " + e.getMessage();
-                LOG.error(msg, e);
-                throw new IllegalStateException(msg, e);
-            }
-
-            try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("IP allocation process started for " + 
memberContext);
-                }
-                String autoAssignIpProp =
-                        
iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY);
-
-                String pre_defined_ip =
-                        
iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY);
-
-                // reset ip
-                String ip = "";
-
-                // default behavior is autoIpAssign=false
-                if (autoAssignIpProp == null ||
-                        (autoAssignIpProp != null && 
autoAssignIpProp.equals("false"))) {
-
-                    // check if floating ip is well defined in cartridge 
definition
-                    if (pre_defined_ip != null) {
-                        if (isValidIpAddress(pre_defined_ip)) {
-                            if (LOG.isDebugEnabled()) {
-                                
LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking 
associatePredefinedAddress" + pre_defined_ip);
-                            }
-                            ip = iaas.associatePredefinedAddress(node, 
pre_defined_ip);
-
-                            if (ip == null || "".equals(ip) || 
!pre_defined_ip.equals(ip)) {
-                                // throw exception and stop instance creation
-                                String msg = "Error occurred while allocating 
predefined floating ip address: " + pre_defined_ip +
-                                        " / allocated ip:" + ip +
-                                        " - terminating node:" + 
memberContext.toString();
-                                LOG.error(msg);
-                                // terminate instance
-                                terminate(iaasProvider,
-                                        node.getId(), memberContext);
-                                throw new CloudControllerException(msg);
-                            }
-                        } else {
-                            String msg = "Invalid floating ip address 
configured: " + pre_defined_ip +
-                                    " - terminating node:" + 
memberContext.toString();
-                            LOG.error(msg);
-                            // terminate instance
-                            terminate(iaasProvider,
-                                    node.getId(), memberContext);
-                            throw new CloudControllerException(msg);
-                        }
-
-                    } else {
-                        if (LOG.isDebugEnabled()) {
-                            
LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined 
floating ip configured, "
-                                    + "selecting available one from pool");
-                        }
-                        // allocate an IP address - manual IP assigning mode
-                        ip = iaas.associateAddress(node);
-
-                        if (ip != null) {
-                            memberContext.setAllocatedIpAddress(ip);
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Allocated an ip address: "
-                                        + memberContext.toString());
-                            } else if (LOG.isInfoEnabled()) {
-                                LOG.info("Allocated ip address [ " + 
memberContext.getAllocatedIpAddress() +
-                                       " ] to member with id: " + 
memberContext.getMemberId());
-                            }
-                        }
-                    }
-
-                    if (ip == null) {
-                        String msg = "No IP address found. IP allocation 
failed for " + memberContext;
-                        LOG.error(msg);
-                        throw new CloudControllerException(msg);
-                    }
-
-                    // build the node with the new ip
-                    node = NodeMetadataBuilder.fromNodeMetadata(node)
-                            .publicAddresses(ImmutableSet.of(ip)).build();
-                }
-
-
-                // public ip
-                if (node.getPublicAddresses() != null &&
-                        node.getPublicAddresses().iterator().hasNext()) {
-                    ip = node.getPublicAddresses().iterator().next();
-                    publicIp = ip;
-                    memberContext.setPublicIpAddress(ip);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Retrieving Public IP Address : " + 
memberContext.toString());
-                    } else if (LOG.isInfoEnabled()) {
-                        LOG.info("Retrieving Public IP Address: " + 
memberContext.getPublicIpAddress() +
-                                ", member id: " + memberContext.getMemberId());
-                    }
-                }
-
-                // private IP
-                if (node.getPrivateAddresses() != null &&
-                        node.getPrivateAddresses().iterator().hasNext()) {
-                    ip = node.getPrivateAddresses().iterator().next();
-                    memberContext.setPrivateIpAddress(ip);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Retrieving Private IP Address. " + 
memberContext.toString());
-                    } else if (LOG.isInfoEnabled()) {
-                        LOG.info("Retrieving Private IP Address: " + 
memberContext.getPrivateIpAddress() +
-                            ", member id: " +  memberContext.getMemberId());
-                    }
-                }
-
-                dataHolder.addMemberContext(memberContext);
-
-                // persist in registry
-                persist();
-
-
-                // trigger topology
-                TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId,
-                        partition.getId(), ip, publicIp, memberContext);
-
-                String memberID = memberContext.getMemberId();
-
-                // update the topology with the newly spawned member
-                // publish data
-                CartridgeInstanceDataPublisher.publish(memberID,
-                        memberContext.getPartition().getId(),
-                        memberContext.getNetworkPartitionId(),
-                        memberContext.getClusterId(),
-                        cartridgeType,
-                        MemberStatus.Created.toString(),
-                        node);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Node details: " + node.toString());
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("IP allocation process ended for " + 
memberContext);
-                }
-
-            } catch (Exception e) {
-                String msg = "Error occurred while allocating an ip address. " 
+ memberContext.toString();
-                LOG.error(msg, e);
-                throw new CloudControllerException(msg, e);
-            }
-
-
-        }
-    }
-
-    private boolean isValidIpAddress(String ip) {
-        boolean isValid = InetAddresses.isInetAddress(ip);
-        return isValid;
-    }
-
-    @Override
-    public void terminateAllInstances(String clusterId) throws 
InvalidClusterException {
-
-        LOG.info("Starting to terminate all instances of cluster : "
-                + clusterId);
-
-        handleNullObject(clusterId, "Instance termination failed. Cluster id 
is null.");
-
-        List<MemberContext> ctxts = 
dataHolder.getMemberContextsOfClusterId(clusterId);
-
-        if (ctxts == null) {
-            String msg = "Instance termination failed. No members found for 
cluster id: " + clusterId;
-            LOG.warn(msg);
-            return;
-        }
-
-        ThreadExecutor exec = ThreadExecutor.getInstance();
-        for (MemberContext memberContext : ctxts) {
-            exec.execute(new InstanceTerminator(memberContext));
-        }
-
-    }
-
-
-    /**
-     * A helper method to terminate an instance.
-     *
-     * @param iaasProvider
-     * @param ctxt
-     * @param nodeId
-     * @return will return the IaaSProvider
-     */
-    private IaasProvider terminate(IaasProvider iaasProvider,
-                                   String nodeId, MemberContext ctxt) {
-        Iaas iaas = iaasProvider.getIaas();
-        if (iaas == null) {
-
-            try {
-                iaas = CloudControllerUtil.getIaas(iaasProvider);
-            } catch (InvalidIaasProviderException e) {
-                String msg =
-                        "Instance termination failed. " + ctxt.toString() +
-                                ". Cause: Unable to build Iaas of this " + 
iaasProvider.toString();
-                LOG.error(msg, e);
-                throw new CloudControllerException(msg, e);
-            }
-
-        }
-
-        //detach volumes if any
-        detachVolume(iaasProvider, ctxt);
-
-        // destroy the node
-        iaasProvider.getComputeService().destroyNode(nodeId);
-
-        // release allocated IP address
-        if (ctxt.getAllocatedIpAddress() != null) {
-            iaas.releaseAddress(ctxt.getAllocatedIpAddress());
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Member is terminated: " + ctxt.toString());
-        } else if (LOG.isInfoEnabled()) {
-            LOG.info("Member with id " + ctxt.getMemberId() + " is 
terminated");
-        }
-        return iaasProvider;
-    }
-
-    private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) {
-        String clusterId = ctxt.getClusterId();
-        ClusterContext clusterCtxt = dataHolder.getClusterContext(clusterId);
-        if (clusterCtxt.getVolumes() != null) {
-            for (Volume volume : clusterCtxt.getVolumes()) {
-                try {
-                    String volumeId = volume.getId();
-                    if (volumeId == null) {
-                        return;
-                    }
-                    Iaas iaas = iaasProvider.getIaas();
-                    iaas.detachVolume(ctxt.getInstanceId(), volumeId);
-                } catch (ResourceNotFoundException ignore) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(ignore);
-                    }
-                }
-            }
-        }
-    }
-
-    private void logTermination(MemberContext memberContext) {
-
-        if (memberContext == null) {
-            return;
-        }
-
-        String partitionId = memberContext.getPartition() == null ? null : 
memberContext.getPartition().getId();
-
-        //updating the topology
-        
TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(),
-                memberContext.getClusterId(), 
memberContext.getNetworkPartitionId(),
-                partitionId, memberContext.getMemberId());
-
-        //publishing data
-        CartridgeInstanceDataPublisher.publish(memberContext.getMemberId(),
-                partitionId,
-                memberContext.getNetworkPartitionId(),
-                memberContext.getClusterId(),
-                memberContext.getCartridgeType(),
-                MemberStatus.Terminated.toString(),
-                null);
-
-        // update data holders
-        dataHolder.removeMemberContext(memberContext.getMemberId(), 
memberContext.getClusterId());
-
-        // persist
-        persist();
-
-    }
-
-    @Override
-    public boolean registerService(Registrant registrant)
-            throws UnregisteredCartridgeException {
-
-        String cartridgeType = registrant.getCartridgeType();
-        handleNullObject(cartridgeType, "Service registration failed. 
Cartridge Type is null.");
-
-        String clusterId = registrant.getClusterId();
-        handleNullObject(clusterId, "Service registration failed. Cluster id 
is null.");
-
-        String payload = registrant.getPayload();
-        handleNullObject(payload, "Service registration failed. Payload is 
null.");
-
-        String hostName = registrant.getHostName();
-        handleNullObject(hostName, "Service registration failed. Hostname is 
null.");
-
-        Cartridge cartridge = null;
-        if ((cartridge = dataHolder.getCartridge(cartridgeType)) == null) {
-
-            String msg = "Registration of cluster: " + clusterId +
-                    " failed. - Unregistered Cartridge type: " + cartridgeType;
-            LOG.error(msg);
-            throw new UnregisteredCartridgeException(msg);
-        }
-
-        Properties props = 
CloudControllerUtil.toJavaUtilProperties(registrant.getProperties());
-        String property = props.getProperty(IS_LOAD_BALANCER);
-        boolean isLb = property != null ? Boolean.parseBoolean(property) : 
false;
-
-        //TODO fix the properties issue
-        /*ClusterContext ctxt = buildClusterContext(cartridge, clusterId,
-        payload, hostName, props, isLb, registrant.getPersistence());
-
-
-        dataHolder.addClusterContext(ctxt);*/
-        TopologyBuilder.handleClusterCreated(registrant, isLb);
-
-        persist();
-
-        LOG.info("Successfully registered: " + registrant);
-
-        return true;
-    }
-
-    private ClusterContext buildClusterContext(Cartridge cartridge,
-                                               String clusterId, String 
payload, String hostName,
-                                               
org.apache.stratos.common.Properties props, boolean isLb, Persistence 
persistence) {
-        //TODO fix properties issue
-        // initialize ClusterContext
-               ClusterContext ctxt = new ClusterContext(clusterId, 
cartridge.getType(), payload, 
-                               hostName, isLb, props);
-               
-               /*String property;
-               property = props.get(Constants.GRACEFUL_SHUTDOWN_TIMEOUT);
-               long timeout = property != null ? Long.parseLong(property) : 
30000;
-
-        boolean persistanceRequired = false;
-        if(persistence != null){
-              persistanceRequired = persistence.isPersistanceRequired();
-        }
-
-        if(persistanceRequired){
-            ctxt.setVolumes(persistence.getVolumes());
-            ctxt.setVolumeRequired(true);
-        }else{
-            ctxt.setVolumeRequired(false);
-        }
-           ctxt.setTimeoutInMillis(timeout);
-               return ctxt;
-        ;*/
-        return null;
-    }
-
-    @Override
-    public String[] getRegisteredCartridges() {
-        // get the list of cartridges registered
-        List<Cartridge> cartridges = dataHolder
-                .getCartridges();
-
-        if (cartridges == null) {
-            LOG.info("No registered Cartridge found.");
-            return new String[0];
-        }
-
-        String[] cartridgeTypes = new String[cartridges.size()];
-        int i = 0;
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Registered Cartridges : \n");
-        }
-        for (Cartridge cartridge : cartridges) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(cartridge);
-            }
-            cartridgeTypes[i] = cartridge.getType();
-            i++;
-        }
-
-        return cartridgeTypes;
-    }
-
-    @Override
-    public CartridgeInfo getCartridgeInfo(String cartridgeType)
-            throws UnregisteredCartridgeException {
-        Cartridge cartridge = dataHolder
-                .getCartridge(cartridgeType);
-
-        if (cartridge != null) {
-
-            return CloudControllerUtil.toCartridgeInfo(cartridge);
-
-        }
-
-        String msg = "Cannot find a Cartridge having a type of "
-                + cartridgeType + ". Hence unable to find information.";
-        LOG.error(msg);
-        throw new UnregisteredCartridgeException(msg);
-    }
-
-    @Override
-    public void unregisterService(String clusterId) throws 
UnregisteredClusterException {
-        final String clusterId_ = clusterId;
-
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId_);
-
-        handleNullObject(ctxt, "Service unregistration failed. Invalid cluster 
id: " + clusterId);
-
-        String cartridgeType = ctxt.getCartridgeType();
-
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
-
-        if (cartridge == null) {
-            String msg =
-                    "Service unregistration failed. No matching Cartridge 
found [type] " + cartridgeType + ". ";
-            LOG.error(msg);
-            throw new UnregisteredClusterException(msg);
-        }
-
-        // if it's a kubernetes cluster
-        if 
(StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) 
{
-            unregisterDockerService(clusterId_);
-
-        } else {
-
-//             
TopologyBuilder.handleClusterMaintenanceMode(dataHolder.getClusterContext(clusterId_));
-
-            Runnable terminateInTimeout = new Runnable() {
-                @Override
-                public void run() {
-                    ClusterContext ctxt = 
dataHolder.getClusterContext(clusterId_);
-                    if (ctxt == null) {
-                        String msg = "Service unregistration failed. Cluster 
not found: " + clusterId_;
-                        LOG.error(msg);
-                        return;
-                    }
-                    Collection<Member> members = TopologyManager.getTopology().
-                            
getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
-                    //finding the responding members from the existing members 
in the topology.
-                    int sizeOfRespondingMembers = 0;
-                    for (Member member : members) {
-                        if (member.getStatus().getCode() >= 
MemberStatus.Activated.getCode()) {
-                            sizeOfRespondingMembers++;
-                        }
-                    }
-
-                    long endTime = System.currentTimeMillis() + 
ctxt.getTimeoutInMillis() * sizeOfRespondingMembers;
-                    while (System.currentTimeMillis() < endTime) {
-                        CloudControllerUtil.sleep(1000);
-
-                    }
-
-                    // if there're still alive members
-                    if (members.size() > 0) {
-                        //forcefully terminate them
-                        for (Member member : members) {
-
-                            try {
-                                terminateInstance(member.getMemberId());
-                            } catch (Exception e) {
-                                // we are not gonna stop the execution due to 
errors.
-                                LOG.warn("Instance termination failed of 
member [id] " + member.getMemberId(), e);
-                            }
-                        }
-                    }
-                }
-            };
-            Runnable unregister = new Runnable() {
-                public void run() {
-                    ClusterContext ctxt = 
dataHolder.getClusterContext(clusterId_);
-                    if (ctxt == null) {
-                        String msg = "Service unregistration failed. Cluster 
not found: " + clusterId_;
-                        LOG.error(msg);
-                        return;
-                    }
-                    Collection<Member> members = TopologyManager.getTopology().
-                            
getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
-                    // TODO why end time is needed?
-                    // long endTime = System.currentTimeMillis() + 
ctxt.getTimeoutInMillis() * members.size();
-
-                    while (members.size() > 0) {
-                        //waiting until all the members got removed from the 
Topology/ timed out
-                        CloudControllerUtil.sleep(1000);
-                    }
-
-                    LOG.info("Unregistration of service cluster: " + 
clusterId_);
-                    deleteVolumes(ctxt);
-                    onClusterRemoval(clusterId_);
-                }
-
-                private void deleteVolumes(ClusterContext ctxt) {
-                    if (ctxt.isVolumeRequired()) {
-                        Cartridge cartridge = 
dataHolder.getCartridge(ctxt.getCartridgeType());
-                        if (cartridge != null && cartridge.getIaases() != null 
&& ctxt.getVolumes() != null) {
-                            for (Volume volume : ctxt.getVolumes()) {
-                                if (volume.getId() != null) {
-                                    String iaasType = volume.getIaasType();
-                                    //Iaas iaas = 
dataHolder.getIaasProvider(iaasType).getIaas();
-                                    Iaas iaas = 
cartridge.getIaasProvider(iaasType).getIaas();
-                                    if (iaas != null) {
-                                        try {
-                                            // delete the volumes if remove on 
unsubscription is true.
-                                            if 
(volume.isRemoveOntermination()) {
-                                                
iaas.deleteVolume(volume.getId());
-                                                volume.setId(null);
-                                            }
-                                        } catch (Exception ignore) {
-                                            if (LOG.isErrorEnabled()) {
-                                                LOG.error("Error while 
deleting volume [id] " + volume.getId(), ignore);
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-
-                        }
-                    }
-                }
-            };
-            new Thread(terminateInTimeout).start();
-            new Thread(unregister).start();
-        }
-    }
-
-    @Override
-    public void unregisterDockerService(String clusterId)
-            throws UnregisteredClusterException {
-
-        // terminate all kubernetes units
-        try {
-            terminateAllContainers(clusterId);
-        } catch (InvalidClusterException e) {
-            String msg = "Docker instance termination fails for cluster: " + 
clusterId;
-            LOG.error(msg, e);
-            throw new UnregisteredClusterException(msg, e);
-        }
-        // send cluster removal notifications and update the state
-        onClusterRemoval(clusterId);
-    }
-
-
-    @Override
-    public boolean validateDeploymentPolicy(String cartridgeType, Partition[] 
partitions)
-            throws InvalidPartitionException, InvalidCartridgeTypeException {
-
-        Map<String, List<String>> validatedCache = 
dataHolder.getCartridgeTypeToPartitionIds();
-        List<String> validatedPartitions = new ArrayList<String>();
-
-        if (validatedCache.containsKey(cartridgeType)) {
-            // cache hit for this cartridge
-            // get list of partitions
-            validatedPartitions = validatedCache.get(cartridgeType);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Partition validation cache hit for cartridge type: 
" + cartridgeType);
-            }
-
-        }
-
-        Map<String, IaasProvider> partitionToIaasProviders =
-                new ConcurrentHashMap<String, IaasProvider>();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Deployment policy validation started for cartridge 
type: " + cartridgeType);
-        }
-
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
-
-        if (cartridge == null) {
-            String msg = "Invalid Cartridge Type: " + cartridgeType;
-            LOG.error(msg);
-            throw new InvalidCartridgeTypeException(msg);
-        }
-
-        Map<String, Future<IaasProvider>> jobList = new HashMap<String, 
Future<IaasProvider>>();
-
-        for (Partition partition : partitions) {
-
-            if (validatedPartitions.contains(partition.getId())) {
-                // partition cache hit
-                continue;
-            }
-
-            Callable<IaasProvider> worker = new PartitionValidatorCallable(
-                    partition, cartridge);
-            Future<IaasProvider> job = FasterLookUpDataHolder.getInstance()
-                    .getExecutor().submit(worker);
-            jobList.put(partition.getId(), job);
-        }
-
-        // Retrieve the results of the concurrently performed sanity checks.
-        for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) {
-            if (entry == null) {
-                continue;
-            }
-            String partitionId = entry.getKey();
-            Future<IaasProvider> job = entry.getValue();
-            try {
-                // add to a temporary Map
-                partitionToIaasProviders.put(partitionId, job.get());
-
-                // add to cache
-                
this.dataHolder.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId);
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Partition " + partitionId + " added to the 
cache against cartridge type: " + cartridgeType);
-                }
-            } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
-                throw new InvalidPartitionException(e.getMessage(), e);
-            }
-        }
-
-        // if and only if the deployment policy valid
-        cartridge.addIaasProviders(partitionToIaasProviders);
-
-        // persist data
-        persist();
-
-        LOG.info("All partitions " + 
CloudControllerUtil.getPartitionIds(partitions) +
-                " were validated successfully, against the Cartridge: " + 
cartridgeType);
-
-        return true;
-    }
-
-    private void onClusterRemoval(final String clusterId) {
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
-        TopologyBuilder.handleClusterRemoved(ctxt);
-        dataHolder.removeClusterContext(clusterId);
-        dataHolder.removeMemberContextsOfCluster(clusterId);
-        persist();
-    }
-
-    @Override
-    public boolean validatePartition(Partition partition) throws 
InvalidPartitionException {
-        handleNullObject(partition, "Partition validation failed. Partition is 
null.");
-        String provider = partition.getProvider();
-        handleNullObject(provider, "Partition [" + partition.getId() + "] 
validation failed. Partition provider is null.");
-        IaasProvider iaasProvider = dataHolder.getIaasProvider(provider);
-
-        if (iaasProvider == null) {
-            String msg =
-                    "Invalid Partition - " + partition.toString() + ". Cause: 
Iaas Provider " +
-                            "is null for Partition Provider: " + provider;
-            LOG.error(msg);
-            throw new InvalidPartitionException(msg);
-        }
-
-        Iaas iaas = iaasProvider.getIaas();
-
-        if (iaas == null) {
-
-            try {
-                iaas = CloudControllerUtil.getIaas(iaasProvider);
-            } catch (InvalidIaasProviderException e) {
-                String msg =
-                        "Invalid Partition - " + partition.toString() +
-                                ". Cause: Unable to build Iaas of this 
IaasProvider [Provider] : " + provider + ". " + e.getMessage();
-                LOG.error(msg, e);
-                throw new InvalidPartitionException(msg, e);
-            }
-
-        }
-
-        PartitionValidator validator = iaas.getPartitionValidator();
-        validator.setIaasProvider(iaasProvider);
-        validator.validate(partition.getId(),
-                
CloudControllerUtil.toJavaUtilProperties(partition.getProperties()));
-
-        return true;
-    }
-
-    public ClusterContext getClusterContext(String clusterId) {
-
-        return dataHolder.getClusterContext(clusterId);
-    }
-
-    @Override
-    public MemberContext[] startContainers(ContainerClusterContext 
containerClusterContext)
-            throws UnregisteredCartridgeException {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("CloudControllerServiceImpl:startContainers");
-        }
-
-        handleNullObject(containerClusterContext, "Container start-up failed. 
ContainerClusterContext is null.");
-
-        String clusterId = containerClusterContext.getClusterId();
-        handleNullObject(clusterId, "Container start-up failed. Cluster id is 
null.");
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Received a container spawn request : " + 
containerClusterContext.toString());
-        }
-
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
-        handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. 
" + containerClusterContext.toString());
-
-        String cartridgeType = ctxt.getCartridgeType();
-
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
-
-        if (cartridge == null) {
-            String msg =
-                    "Instance start-up failed. No matching Cartridge found 
[type] " + cartridgeType + ". " +
-                            containerClusterContext.toString();
-            LOG.error(msg);
-            throw new UnregisteredCartridgeException(msg);
-        }
-
-        try {
-            String minReplicas = 
validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt);
-            String kubernetesClusterId = 
validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
-            String kubernetesMasterIp = 
validateProperty(StratosConstants.KUBERNETES_MASTER_IP, 
containerClusterContext);
-            String kubernetesPortRange = 
validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, 
containerClusterContext);
-
-            KubernetesClusterContext kubClusterContext = 
getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, 
kubernetesPortRange);
-
-            KubernetesApiClient kubApi = kubClusterContext.getKubApi();
-
-            // first let's create a replication controller.
-            ContainerClusterContextToReplicationController controllerFunction 
= new ContainerClusterContextToReplicationController();
-            ReplicationController controller = 
controllerFunction.apply(containerClusterContext);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller is delegating request to start a 
replication controller " + controller +
-                        " for " + containerClusterContext + " to Kubernetes 
layer.");
-            }
-
-            kubApi.createReplicationController(controller);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller successfully started the 
controller "
-                        + controller + " via Kubernetes layer.");
-            }
-
-            // secondly let's create a kubernetes service proxy to load 
balance these containers
-            ContainerClusterContextToKubernetesService serviceFunction = new 
ContainerClusterContextToKubernetesService();
-            Service service = serviceFunction.apply(containerClusterContext);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller is delegating request to start a 
service " + service +
-                        " for " + containerClusterContext + " to Kubernetes 
layer.");
-            }
-
-            kubApi.createService(service);
-
-            // set host port and update
-            Property allocatedServiceHostPortProp = new Property();
-            
allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
-            
allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort()));
-            ctxt.getProperties().addProperty(allocatedServiceHostPortProp);
-            dataHolder.addClusterContext(ctxt);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller successfully started the service "
-                        + controller + " via Kubernetes layer.");
-            }
-
-            // create a label query
-            Label l = new Label();
-            l.setName(clusterId);
-            // execute the label query
-            Pod[] newlyCreatedPods = new Pod[0];
-            int expectedCount = Integer.parseInt(minReplicas);
-
-            for (int i = 0; i < expectedCount; i++) {
-                newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l});
-
-                if (LOG.isDebugEnabled()) {
-
-                    LOG.debug("Pods Count: " + newlyCreatedPods.length + " for 
cluster: " + clusterId);
-                }
-                if (newlyCreatedPods.length == expectedCount) {
-                    break;
-                }
-                Thread.sleep(10000);
-            }
-
-            if (newlyCreatedPods.length == 0) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(String.format("Pods are not created for cluster 
: %s, hence deleting the service", clusterId));
-                }
-                terminateAllContainers(clusterId);
-                return new MemberContext[0];
-            }
-
-            if (LOG.isDebugEnabled()) {
-
-                LOG.debug(String.format("Pods created : %s for cluster : %s", 
newlyCreatedPods.length, clusterId));
-            }
-
-            List<MemberContext> memberContexts = new 
ArrayList<MemberContext>();
-
-            PodToMemberContext podToMemberContextFunc = new 
PodToMemberContext();
-            // generate Member Contexts
-            for (Pod pod : newlyCreatedPods) {
-                MemberContext context = podToMemberContextFunc.apply(pod);
-                context.setCartridgeType(cartridgeType);
-                context.setClusterId(clusterId);
-
-                context.setProperties(CloudControllerUtil.addProperty(context
-                                .getProperties(), 
StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
-                        String.valueOf(service.getPort())));
-
-                dataHolder.addMemberContext(context);
-
-                // wait till Pod status turns to running and send member 
spawned.
-                ScheduledThreadExecutor exec = 
ScheduledThreadExecutor.getInstance();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cloud Controller is starting the instance start 
up thread.");
-                }
-                dataHolder.addScheduledFutureJob(context.getMemberId(), 
exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
-
-                memberContexts.add(context);
-            }
-
-            // persist in registry
-            persist();
-
-            LOG.info("Kubernetes entities are successfully starting up: " + 
memberContexts);
-
-            return memberContexts.toArray(new MemberContext[0]);
-
-        } catch (Exception e) {
-            String msg = "Failed to start an instance. " + 
containerClusterContext.toString() + " Cause: " + e.getMessage();
-            LOG.error(msg, e);
-            throw new IllegalStateException(msg, e);
-        }
-    }
-
-    private String validateProperty(String property, ClusterContext ctxt) {
-
-        String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), 
property);
-        handleNullObject(propVal, "Property validation failed. Cannot find '" 
+ property + "' in " + ctxt);
-        return propVal;
-    }
-
-    private String validateProperty(String property, ContainerClusterContext 
ctxt) {
-
-        String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), 
property);
-        handleNullObject(propVal, "Property validation failed. '" + property + 
"' in " + ctxt);
-        return propVal;
-
-    }
-
-    private KubernetesClusterContext getKubernetesClusterContext(
-            String kubernetesClusterId, String kubernetesMasterIp,
-            String kubernetesPortRange) {
-
-        KubernetesClusterContext origCtxt = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
-        KubernetesClusterContext newCtxt = new 
KubernetesClusterContext(kubernetesClusterId, kubernetesPortRange, 
kubernetesMasterIp);
-
-        if (origCtxt == null) {
-            dataHolder.addKubernetesClusterContext(newCtxt);
-            return newCtxt;
-        }
-
-        if (!origCtxt.equals(newCtxt)) {
-            // if for some reason master IP etc. have changed
-            newCtxt.setAvailableHostPorts(origCtxt.getAvailableHostPorts());
-            dataHolder.addKubernetesClusterContext(newCtxt);
-            return newCtxt;
-        } else {
-            return origCtxt;
-        }
-    }
-
-    @Override
-    public MemberContext[] terminateAllContainers(String clusterId)
-            throws InvalidClusterException {
-
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
-        handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid 
cluster id. " + clusterId);
-
-        String kubernetesClusterId = 
CloudControllerUtil.getProperty(ctxt.getProperties(),
-                StratosConstants.KUBERNETES_CLUSTER_ID);
-        handleNullObject(kubernetesClusterId, "Kubernetes units termination 
failed. Cannot find '" +
-                StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt);
-
-        KubernetesClusterContext kubClusterContext = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
-        handleNullObject(kubClusterContext, "Kubernetes units termination 
failed. Cannot find a matching Kubernetes Cluster for cluster id: "
-                + kubernetesClusterId);
-
-        KubernetesApiClient kubApi = kubClusterContext.getKubApi();
-        // delete the service
-        try {
-            
kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId));
-        } catch (KubernetesClientException e) {
-            // we're not going to throw this error, but proceed with other 
deletions
-            LOG.error("Failed to delete Kubernetes service with id: " + 
clusterId, e);
-        }
-
-        // set replicas=0 for the replication controller
-        try {
-            kubApi.updateReplicationController(clusterId, 0);
-        } catch (KubernetesClientException e) {
-            // we're not going to throw this error, but proceed with other 
deletions
-            LOG.error("Failed to update Kubernetes Controller with id: " + 
clusterId, e);
-        }
-
-        // delete pods forcefully
-        try {
-            // create a label query
-            Label l = new Label();
-            l.setName(clusterId);
-            // execute the label query
-            Pod[] pods = kubApi.getSelectedPods(new Label[]{l});
-
-            for (Pod pod : pods) {
-                try {
-                    // delete pods forcefully
-                    kubApi.deletePod(pod.getId());
-                } catch (KubernetesClientException ignore) {
-                    // we can't do nothing here
-                    LOG.warn(String.format("Failed to delete Pod [%s] 
forcefully!", pod.getId()));
-                }
-            }
-        } catch (KubernetesClientException e) {
-            // we're not going to throw this error, but proceed with other 
deletions
-            LOG.error("Failed to delete pods forcefully for cluster: " + 
clusterId, e);
-        }
-
-        // delete the replication controller.
-        try {
-            kubApi.deleteReplicationController(clusterId);
-        } catch (KubernetesClientException e) {
-            String msg = "Failed to delete Kubernetes Controller with id: " + 
clusterId;
-            LOG.error(msg, e);
-            throw new InvalidClusterException(msg, e);
-        }
-
-        String allocatedPort = 
CloudControllerUtil.getProperty(ctxt.getProperties(),
-                StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
-
-        if (allocatedPort != null) {
-            kubClusterContext.deallocateHostPort(Integer
-                    .parseInt(allocatedPort));
-        } else {
-            LOG.warn("Host port dealloacation failed due to a missing 
property: "
-                    + StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
-        }
-
-        List<MemberContext> membersToBeRemoved = 
dataHolder.getMemberContextsOfClusterId(clusterId);
-
-        for (MemberContext memberContext : membersToBeRemoved) {
-            logTermination(memberContext);
-        }
-
-        // persist
-        persist();
-
-        return membersToBeRemoved.toArray(new MemberContext[0]);
-    }
-
-    @Override
-    public MemberContext[] updateContainers(String clusterId, int replicas)
-            throws UnregisteredCartridgeException {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("CloudControllerServiceImpl:updateContainers for cluster 
: " + clusterId);
-        }
-
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
-        handleNullObject(ctxt, "Container update failed. Invalid cluster id. " 
+ clusterId);
-
-        String cartridgeType = ctxt.getCartridgeType();
-
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
-
-        if (cartridge == null) {
-            String msg =
-                    "Container update failed. No matching Cartridge found 
[type] " + cartridgeType
-                            + ". [cluster id] " + clusterId;
-            LOG.error(msg);
-            throw new UnregisteredCartridgeException(msg);
-        }
-
-        try {
-            String kubernetesClusterId = 
validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
-
-            KubernetesClusterContext kubClusterContext = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
-
-            if (kubClusterContext == null) {
-                String msg =
-                        "Instance start-up failed. No matching Kubernetes 
Context Found for [id] " + kubernetesClusterId
-                                + ". [cluster id] " + clusterId;
-                LOG.error(msg);
-                throw new UnregisteredCartridgeException(msg);
-            }
-
-            KubernetesApiClient kubApi = kubClusterContext.getKubApi();
-            // create a label query
-            Label l = new Label();
-            l.setName(clusterId);
-
-            // get the current pods - useful when scale down
-            Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l});
-
-            // update the replication controller - cluster id = replication 
controller id
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller is delegating request to update a 
replication controller " + clusterId +
-                        " to Kubernetes layer.");
-            }
-
-            kubApi.updateReplicationController(clusterId, replicas);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller successfully updated the 
controller "
-                        + clusterId + " via Kubernetes layer.");
-            }
-
-            // execute the label query
-            Pod[] allPods = new Pod[0];
-
-            // wait replicas*5s time in the worst case ; best case = 0s
-            for (int i = 0; i < (replicas * previousStatePods.length + 1); 
i++) {
-                allPods = kubApi.getSelectedPods(new Label[]{l});
-
-                if (LOG.isDebugEnabled()) {
-
-                    LOG.debug("Pods Count: " + allPods.length + " for cluster: 
" + clusterId);
-                }
-                if (allPods.length == replicas) {
-                    break;
-                }
-                Thread.sleep(10000);
-            }
-
-            if (LOG.isDebugEnabled()) {
-
-                LOG.debug(String.format("Pods created : %s for cluster : %s", 
allPods.length, clusterId));
-            }
-
-            List<MemberContext> memberContexts = new 
ArrayList<MemberContext>();
-
-            PodToMemberContext podToMemberContextFunc = new 
PodToMemberContext();
-            // generate Member Contexts
-            for (Pod pod : allPods) {
-                MemberContext context;
-                // if member context does not exist -> a new member (scale up)
-                if ((context = 
dataHolder.getMemberContextOfMemberId(pod.getId())) == null) {
-
-                    context = podToMemberContextFunc.apply(pod);
-                    context.setCartridgeType(cartridgeType);
-                    context.setClusterId(clusterId);
-
-                    
context.setProperties(CloudControllerUtil.addProperty(context
-                                    .getProperties(), 
StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
-                            
CloudControllerUtil.getProperty(ctxt.getProperties(),
-                                    
StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
-
-                    // wait till Pod status turns to running and send member 
spawned.
-                    ScheduledThreadExecutor exec = 
ScheduledThreadExecutor.getInstance();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Cloud Controller is starting the instance 
start up thread.");
-                    }
-                    dataHolder.addScheduledFutureJob(context.getMemberId(), 
exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
-
-                    memberContexts.add(context);
-
-                }
-                // publish data
-                // TODO
-//                
CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, 
context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node);
-
-            }
-
-            if (memberContexts.isEmpty()) {
-                // terminated members
-                @SuppressWarnings("unchecked")
-                List<Pod> difference = 
ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods));
-                for (Pod pod : difference) {
-                    if (pod != null) {
-                        MemberContext context = 
dataHolder.getMemberContextOfMemberId(pod.getId());
-                        logTermination(context);
-                        memberContexts.add(context);
-                    }
-                }
-            }
-
-
-            // persist in registry
-            persist();
-
-            LOG.info("Kubernetes entities are successfully starting up. " + 
memberContexts);
-
-            return memberContexts.toArray(new MemberContext[0]);
-
-        } catch (Exception e) {
-            String msg = "Failed to update containers belong to cluster " + 
clusterId + ". Cause: " + e.getMessage();
-            LOG.error(msg, e);
-            throw new IllegalStateException(msg, e);
-        }
-    }
-
-    @Override
-    public void updateClusterStatus(String serviceName, String clusterId, 
String instanceId, ClusterStatus status) {
-        //TODO
-    }
-
-    @Override
-    public MemberContext terminateContainer(String memberId) throws 
MemberTerminationFailedException {
-
-        handleNullObject(memberId, "Failed to terminate member. Invalid Member 
id. [Member id] " + memberId);
-
-        MemberContext memberContext = 
dataHolder.getMemberContextOfMemberId(memberId);
-
-        handleNullObject(memberContext, "Failed to terminate member. Member id 
not found. [Member id] " + memberId);
-
-        String clusterId = memberContext.getClusterId();
-
-        handleNullObject(clusterId, "Failed to terminate member. Cluster id is 
null. [Member id] " + memberId);
-
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
-
-        handleNullObject(ctxt,
-                String.format("Failed to terminate member [Member id] %s. 
Invalid cluster id %s ", memberId, clusterId));
-
-        String kubernetesClusterId = 
CloudControllerUtil.getProperty(ctxt.getProperties(),
-                StratosConstants.KUBERNETES_CLUSTER_ID);
-
-        handleNullObject(kubernetesClusterId, String.format("Failed to 
terminate member [Member id] %s. Cannot find '" +
-                StratosConstants.KUBERNETES_CLUSTER_ID + "' in [cluster 
context] %s ", memberId, ctxt));
-
-        KubernetesClusterContext kubClusterContext = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
-
-        handleNullObject(kubClusterContext, String.format("Failed to terminate 
member [Member id] %s. Cannot find a matching Kubernetes Cluster in [cluster 
context] %s ", memberId, ctxt));
-
-        KubernetesApiClient kubApi = kubClusterContext.getKubApi();
-        // delete the Pod
-        try {
-            // member id = pod id
-            kubApi.deletePod(memberId);
-
-            MemberContext memberToBeRemoved = 
dataHolder.getMemberContextOfMemberId(memberId);
-
-            logTermination(memberToBeRemoved);
-
-            return memberToBeRemoved;
-
-        } catch (KubernetesClientException e) {
-            String msg = String.format("Failed to terminate member [Member id] 
%s", memberId);
-            LOG.error(msg, e);
-            throw new MemberTerminationFailedException(msg, e);
-        }
-    }
-
-    private void handleNullObject(Object obj, String errorMsg) {
-        if (obj == null) {
-            LOG.error(errorMsg);
-            throw new IllegalArgumentException(errorMsg);
-        }
-    }
-
-    @Override
-    public void createApplicationClusters(String appId, 
ApplicationClusterContextDTO[] appClustersContexts)  throws
-            ApplicationClusterRegistrationException {
-
-        // Create a Cluster Context obj. for each of the Clusters in the 
Application
-        if (appClustersContexts == null || appClustersContexts.length == 0) {
-            String errorMsg = "No application cluster information found, 
unable to create clusters";
-            LOG.error(errorMsg);
-            throw new ApplicationClusterRegistrationException(errorMsg);
-        }
-        List<Cluster> clusters = new ArrayList<Cluster>();
-
-
-        for (ApplicationClusterContextDTO appClusterCtxt : 
appClustersContexts) {
-            dataHolder.addClusterContext(new 
ClusterContext(appClusterCtxt.getClusterId(),
-                    appClusterCtxt.getCartridgeType(), 
appClusterCtxt.getTextPayload(),
-                    appClusterCtxt.getHostName(), 
appClusterCtxt.isLbCluster(), appClusterCtxt.getProperties()));
-            // create Cluster objects
-            Cluster newCluster = new 
Cluster(appClusterCtxt.getCartridgeType(), appClusterCtxt.getClusterId(),
-                    appClusterCtxt.getDep

<TRUNCATED>

Reply via email to