http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerServiceImpl.java new file mode 100644 index 0000000..9f54217 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerServiceImpl.java @@ -0,0 +1,2148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.services; + +import com.google.common.collect.ImmutableSet; +import com.google.common.net.InetAddresses; +import org.apache.commons.collections.ListUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.concurrent.PartitionValidatorCallable; +import org.apache.stratos.cloud.controller.concurrent.ScheduledThreadExecutor; +import org.apache.stratos.cloud.controller.concurrent.ThreadExecutor; +import org.apache.stratos.cloud.controller.domain.*; +import org.apache.stratos.cloud.controller.domain.Cartridge; +import org.apache.stratos.cloud.controller.domain.Dependencies; +import org.apache.stratos.cloud.controller.domain.Partition; +import org.apache.stratos.cloud.controller.exception.*; +import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKubernetesService; +import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController; +import org.apache.stratos.cloud.controller.functions.PodToMemberContext; +import org.apache.stratos.cloud.controller.iaas.Iaas; +import org.apache.stratos.cloud.controller.registry.Deserializer; +import org.apache.stratos.cloud.controller.messaging.publisher.CartridgeInstanceDataPublisher; +import org.apache.stratos.cloud.controller.registry.RegistryManager; +import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyEventPublisher; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.apache.stratos.cloud.controller.util.CloudControllerUtil; +import org.apache.stratos.cloud.controller.util.PodActivationWatcher; +import org.apache.stratos.cloud.controller.iaas.validators.PartitionValidator; +import org.apache.stratos.common.*; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.kubernetes.client.KubernetesApiClient; +import org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException; +import org.apache.stratos.kubernetes.client.model.Label; +import org.apache.stratos.kubernetes.client.model.Pod; +import org.apache.stratos.kubernetes.client.model.ReplicationController; +import org.apache.stratos.kubernetes.client.model.Service; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.jclouds.compute.ComputeService; +import org.jclouds.compute.domain.NodeMetadata; +import org.jclouds.compute.domain.NodeMetadataBuilder; +import org.jclouds.compute.domain.Template; +import org.jclouds.rest.ResourceNotFoundException; +import org.wso2.carbon.registry.core.exceptions.RegistryException; + +import java.util.*; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; + +/** + * Cloud Controller Service is responsible for starting up new server instances, + * terminating already started instances, providing pending instance count etc. + */ +public class CloudControllerServiceImpl implements CloudControllerService { + + private static final Log LOG = LogFactory.getLog(CloudControllerServiceImpl.class); + public static final String IS_LOAD_BALANCER = "load.balancer"; + + private FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder + .getInstance(); + + public CloudControllerServiceImpl() { + // acquire serialized data from registry + acquireData(); + } + + private void acquireData() { + + Object obj = RegistryManager.getInstance().retrieve(); + if (obj != null) { + try { + Object dataObj = Deserializer + .deserializeFromByteArray((byte[]) obj); + if (dataObj instanceof FasterLookUpDataHolder) { + FasterLookUpDataHolder serializedObj = (FasterLookUpDataHolder) dataObj; + FasterLookUpDataHolder currentData = FasterLookUpDataHolder + .getInstance(); + + // assign necessary data + currentData.setClusterIdToContext(serializedObj.getClusterIdToContext()); + currentData.setMemberIdToContext(serializedObj.getMemberIdToContext()); + currentData.setClusterIdToMemberContext(serializedObj.getClusterIdToMemberContext()); + currentData.setCartridges(serializedObj.getCartridges()); + currentData.setKubClusterIdToKubClusterContext(serializedObj.getKubClusterIdToKubClusterContext()); + currentData.setServiceGroups(serializedObj.getServiceGroups()); + + if (LOG.isDebugEnabled()) { + + LOG.debug("Cloud Controller Data is retrieved from registry."); + } + } else { + if (LOG.isDebugEnabled()) { + + LOG.debug("Cloud Controller Data cannot be found in registry."); + } + } + } catch (Exception e) { + + String msg = "Unable to acquire data from Registry. Hence, any historical data will not get reflected."; + LOG.warn(msg, e); + } + + } + } + + public void deployCartridgeDefinition(CartridgeConfig cartridgeConfig) throws InvalidCartridgeDefinitionException, + InvalidIaasProviderException { + + handleNullObject(cartridgeConfig, "Invalid Cartridge Definition: Definition is null."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cartridge definition: " + cartridgeConfig.toString()); + } + + Cartridge cartridge = null; + try { + // cartridge can never be null + cartridge = CloudControllerUtil.toCartridge(cartridgeConfig); + } catch (Exception e) { + String msg = + "Invalid Cartridge Definition: Cartridge Type: " + + cartridgeConfig.getType() + + ". Cause: Cannot instantiate a Cartridge Instance with the given Config. " + e.getMessage(); + LOG.error(msg, e); + throw new InvalidCartridgeDefinitionException(msg, e); + } + + List<IaasProvider> iaases = cartridge.getIaases(); + + if (!StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) { + if (iaases == null || iaases.isEmpty()) { + String msg = "Invalid Cartridge Definition: Cartridge Type: " + + cartridgeConfig.getType() + + ". Cause: Iaases of this Cartridge is null or empty."; + LOG.error(msg); + throw new InvalidCartridgeDefinitionException(msg); + } + + if (iaases == null || iaases.isEmpty()) { + String msg = "Invalid Cartridge Definition: Cartridge Type: " + + cartridgeConfig.getType() + + ". Cause: Iaases of this Cartridge is null or empty."; + LOG.error(msg); + throw new InvalidCartridgeDefinitionException(msg); + } + + for (IaasProvider iaasProvider : iaases) { + CloudControllerUtil.getIaas(iaasProvider); + } + } + + // TODO transaction begins + String cartridgeType = cartridge.getType(); + if (dataHolder.getCartridge(cartridgeType) != null) { + Cartridge cartridgeToBeRemoved = dataHolder.getCartridge(cartridgeType); + // undeploy + try { + undeployCartridgeDefinition(cartridgeToBeRemoved.getType()); + } catch (InvalidCartridgeTypeException e) { + //ignore + } + populateNewCartridge(cartridge, cartridgeToBeRemoved); + } + + dataHolder.addCartridge(cartridge); + + // persist + persist(); + + List<Cartridge> cartridgeList = new ArrayList<Cartridge>(); + cartridgeList.add(cartridge); + + TopologyBuilder.handleServiceCreated(cartridgeList); + // transaction ends + + LOG.info("Successfully deployed the Cartridge definition: " + cartridgeType); + } + + private void populateNewCartridge(Cartridge cartridge, + Cartridge cartridgeToBeRemoved) { + + List<IaasProvider> newIaasProviders = cartridge.getIaases(); + Map<String, IaasProvider> oldPartitionToIaasMap = cartridgeToBeRemoved.getPartitionToIaasProvider(); + + for (Entry<String, IaasProvider> entry : oldPartitionToIaasMap.entrySet()) { + if (entry == null) { + continue; + } + String partitionId = entry.getKey(); + IaasProvider oldIaasProvider = entry.getValue(); + if (newIaasProviders.contains(oldIaasProvider)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Copying a partition from the Cartridge that is undeployed, to the new Cartridge. " + + "[partition id] : " + partitionId + " [cartridge type] " + cartridge.getType()); + } + cartridge.addIaasProvider(partitionId, newIaasProviders.get(newIaasProviders.indexOf(oldIaasProvider))); + } + } + + } + + public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException { + + Cartridge cartridge = null; + if ((cartridge = dataHolder.getCartridge(cartridgeType)) != null) { + if (dataHolder.getCartridges().remove(cartridge)) { + // invalidate partition validation cache + dataHolder.removeFromCartridgeTypeToPartitionIds(cartridgeType); + + if (LOG.isDebugEnabled()) { + LOG.debug("Partition cache invalidated for cartridge " + cartridgeType); + } + + persist(); + + // sends the service removed event + List<Cartridge> cartridgeList = new ArrayList<Cartridge>(); + cartridgeList.add(cartridge); + TopologyBuilder.handleServiceRemoved(cartridgeList); + + if (LOG.isInfoEnabled()) { + LOG.info("Successfully undeployed the Cartridge definition: " + cartridgeType); + } + return; + } + } + String msg = "Cartridge [type] " + cartridgeType + " is not a deployed Cartridge type."; + LOG.error(msg); + throw new InvalidCartridgeTypeException(msg); + } + + public void deployServiceGroup(ServiceGroup servicegroup) throws InvalidServiceGroupException { + + if (servicegroup == null) { + String msg = "Invalid ServiceGroup Definition: Definition is null."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + + } + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:" + servicegroup.getName()); + } + + String[] subGroups = servicegroup.getCartridges(); + + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups" + subGroups); + if (subGroups != null) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups:size" + subGroups.length); + } else { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups: is null"); + } + } + + + Dependencies dependencies = servicegroup.getDependencies(); + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:dependencies" + dependencies); + } + + if (dependencies != null) { + String[] startupOrders = dependencies.getStartupOrders(); + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrders" + startupOrders); + + if (startupOrders != null) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder:size" + startupOrders.length); + } else { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder: is null"); + } + } + } + + dataHolder.addServiceGroup(servicegroup); + + this.persist(); + + } + + public void undeployServiceGroup(String name) throws InvalidServiceGroupException { + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:undeployServiceGroup: " + name); + } + + ServiceGroup serviceGroup = null; + + serviceGroup = dataHolder.getServiceGroup(name); + + if (serviceGroup != null) { + if (dataHolder.getServiceGroups().remove(serviceGroup)) { + persist(); + if (LOG.isInfoEnabled()) { + LOG.info("Successfully undeployed the Service Group definition: " + serviceGroup); + } + return; + } + } + + String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; + LOG.error(msg); + throw new InvalidServiceGroupException(msg); + + } + + @Override + public ServiceGroup getServiceGroup(String name) throws InvalidServiceGroupException { + + if (LOG.isDebugEnabled()) { + LOG.debug("getServiceGroupDefinition:" + name); + } + + ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name); + + if (serviceGroup == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("getServiceGroupDefinition: no entry found for service group " + name); + } + String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; + throw new InvalidServiceGroupException(msg); + } + + return serviceGroup; + } + + public String[] getServiceGroupSubGroups(String name) throws InvalidServiceGroupException { + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + + return serviceGroup.getSubGroups(); + } + + /** + * + */ + public String[] getServiceGroupCartridges(String name) throws InvalidServiceGroupException { + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + String[] cs = serviceGroup.getCartridges(); + return cs; + + } + + public Dependencies getServiceGroupDependencies(String name) throws InvalidServiceGroupException { + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + return serviceGroup.getDependencies(); + } + + @Override + public MemberContext startInstance(MemberContext memberContext) throws + UnregisteredCartridgeException, InvalidIaasProviderException { + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:startInstance"); + } + + handleNullObject(memberContext, "Instance start-up failed. Member is null."); + + String clusterId = memberContext.getClusterId(); + Partition partition = memberContext.getPartition(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Received an instance spawn request : " + memberContext); + } + + Template template = null; + + handleNullObject(partition, "Instance start-up failed. Specified Partition is null. " + + memberContext); + + String partitionId = partition.getId(); + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + + handleNullObject(ctxt, "Instance start-up failed. Invalid cluster id. " + memberContext); + + String cartridgeType = ctxt.getCartridgeType(); + + Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + + if (cartridge == null) { + String msg = + "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " + + memberContext.toString(); + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } + + memberContext.setCartridgeType(cartridgeType); + + + IaasProvider iaasProvider = cartridge.getIaasProviderOfPartition(partitionId); + if (iaasProvider == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("IaasToPartitionMap " + cartridge.hashCode() + + " for cartridge " + cartridgeType + " and for partition: " + partitionId); + } + String msg = "Instance start-up failed. " + + "There's no IaaS provided for the partition: " + + partitionId + + " and for the Cartridge type: " + + cartridgeType + + ". Only following " + + "partitions can be found in this Cartridge: " + + cartridge.getPartitionToIaasProvider().keySet() + .toString() + ". " + memberContext.toString() + + ". "; + LOG.fatal(msg); + throw new InvalidIaasProviderException(msg); + } + String type = iaasProvider.getType(); + try { + // generating the Unique member ID... + String memberID = generateMemberId(clusterId); + memberContext.setMemberId(memberID); + // have to add memberID to the payload + StringBuilder payload = new StringBuilder(ctxt.getPayload()); + addToPayload(payload, "MEMBER_ID", memberID); + addToPayload(payload, "LB_CLUSTER_ID", memberContext.getLbClusterId()); + addToPayload(payload, "NETWORK_PARTITION_ID", memberContext.getNetworkPartitionId()); + addToPayload(payload, "PARTITION_ID", partitionId); + if (memberContext.getProperties() != null) { + org.apache.stratos.common.Properties properties = memberContext.getProperties(); + if (properties != null) { + for (Property prop : properties.getProperties()) { + addToPayload(payload, prop.getName(), prop.getValue()); + } + } + } + + Iaas iaas = iaasProvider.getIaas(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Payload: " + payload.toString()); + } + + if (iaas == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Iaas is null of Iaas Provider: " + type + ". Trying to build IaaS..."); + } + try { + iaas = CloudControllerUtil.getIaas(iaasProvider); + } catch (InvalidIaasProviderException e) { + String msg = "Instance start up failed. " + memberContext.toString() + + "Unable to build Iaas of this IaasProvider [Provider] : " + type + ". Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new InvalidIaasProviderException(msg, e); + } + + } + + if (ctxt.isVolumeRequired()) { + if (ctxt.getVolumes() != null) { + for (Volume volume : ctxt.getVolumes()) { + + if (volume.getId() == null) { + // create a new volume + createVolumeAndSetInClusterContext(volume, iaasProvider); + } + } + } + } + + if (ctxt.isVolumeRequired()) { + addToPayload(payload, "PERSISTENCE_MAPPING", getPersistencePayload(ctxt, iaas).toString()); + } + iaasProvider.setPayload(payload.toString().getBytes()); + iaas.setDynamicPayload(); + + template = iaasProvider.getTemplate(); + + if (template == null) { + String msg = + "Failed to start an instance. " + + memberContext.toString() + + ". Reason : Jclouds Template is null for iaas provider [type]: " + iaasProvider.getType(); + LOG.error(msg); + throw new InvalidIaasProviderException(msg); + } + + //Start instance start up in a new thread + ThreadExecutor exec = ThreadExecutor.getInstance(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is starting the instance start up thread."); + } + exec.execute(new JcloudsInstanceCreator(memberContext, iaasProvider, cartridgeType)); + + LOG.info("Instance is successfully starting up. " + memberContext.toString()); + + return memberContext; + + } catch (Exception e) { + String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); + } + + } + + private void createVolumeAndSetInClusterContext(Volume volume, + IaasProvider iaasProvider) { + // iaas cannot be null at this state #startInstance method + Iaas iaas = iaasProvider.getIaas(); + int sizeGB = volume.getSize(); + String snapshotId = volume.getSnapshotId(); + if (StringUtils.isNotEmpty(volume.getVolumeId())) { + // volumeID is specified, so not creating additional volumes + if (LOG.isDebugEnabled()) { + LOG.debug("Volume creation is skipping since a volume ID is specified. [Volume ID]" + volume.getVolumeId()); + } + volume.setId(volume.getVolumeId()); + } else { + String volumeId = iaas.createVolume(sizeGB, snapshotId); + volume.setId(volumeId); + } + + volume.setIaasType(iaasProvider.getType()); + } + + + private StringBuilder getPersistencePayload(ClusterContext ctx, Iaas iaas) { + StringBuilder persistencePayload = new StringBuilder(); + if (isPersistenceMappingAvailable(ctx)) { + for (Volume volume : ctx.getVolumes()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding persistence mapping " + volume.toString()); + } + if (persistencePayload.length() != 0) { + persistencePayload.append("|"); + } + + persistencePayload.append(iaas.getIaasDevice(volume.getDevice())); + persistencePayload.append("|"); + persistencePayload.append(volume.getId()); + persistencePayload.append("|"); + persistencePayload.append(volume.getMappingPath()); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Persistence payload is" + persistencePayload.toString()); + } + return persistencePayload; + } + + private boolean isPersistenceMappingAvailable(ClusterContext ctx) { + return ctx.getVolumes() != null && ctx.isVolumeRequired(); + } + + private void addToPayload(StringBuilder payload, String name, String value) { + payload.append(","); + payload.append(name + "=" + value); + } + + /** + * Persist data in registry. + */ + private void persist() { + try { + RegistryManager.getInstance().persist( + dataHolder); + } catch (RegistryException e) { + + String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; + LOG.fatal(msg); + throw new CloudControllerException(msg, e); + } + } + + private String generateMemberId(String clusterId) { + UUID memberId = UUID.randomUUID(); + return clusterId + memberId.toString(); + } + + @Override + public void terminateInstance(String memberId) throws InvalidMemberException, InvalidCartridgeTypeException { + + handleNullObject(memberId, "Termination failed. Null member id."); + + MemberContext ctxt = dataHolder.getMemberContextOfMemberId(memberId); + + if (ctxt == null) { + String msg = "Termination failed. Invalid Member Id: " + memberId; + LOG.error(msg); + throw new InvalidMemberException(msg); + } + + if (ctxt.getNodeId() == null && ctxt.getInstanceId() == null) { + // sending member terminated since this instance isn't reachable. + if (LOG.isInfoEnabled()){ + LOG.info(String.format( + "Member cannot be terminated because it is not reachable. [member] %s [nodeId] %s [instanceId] %s. Removing member from topology.", + ctxt.getMemberId(), + ctxt.getNodeId(), + ctxt.getInstanceId())); + } + + logTermination(ctxt); + } + + // check if status == active, if true, then this is a termination on member faulty + Topology topology; + try { + TopologyManager.acquireReadLock(); + topology = TopologyManager.getTopology(); + } finally { + TopologyManager.releaseReadLock(); + } + + org.apache.stratos.messaging.domain.topology.Service service = topology.getService(ctxt.getCartridgeType()); + + if (service != null) { + Cluster cluster = service.getCluster(ctxt.getClusterId()); + + if (cluster != null) { + Member member = cluster.getMember(memberId); + + if (member != null) { + // change member status if termination on a faulty member + if(fixMemberStatus(member, topology)){ + // set the time this member was added to ReadyToShutdown status + ctxt.setObsoleteInitTime(System.currentTimeMillis()); + } + + // check if ready to shutdown member is expired and send + // member terminated if it is. + if (isMemberExpired(member, ctxt.getObsoleteInitTime(), ctxt.getObsoleteExpiryTime())) { + if (LOG.isInfoEnabled()) { + LOG.info(String.format( + "Member pending termination in ReadyToShutdown state exceeded expiry time. This member has to be manually deleted: %s", + ctxt.getMemberId())); + } + + logTermination(ctxt); + return; + } + } + } + } + + ThreadExecutor exec = ThreadExecutor.getInstance(); + exec.execute(new InstanceTerminator(ctxt)); + + } + + /** + * Check if a member has been in the ReadyToShutdown status for a specified expiry time + * + * @param member + * @param initTime + * @param expiryTime + * @return + */ + private boolean isMemberExpired(Member member, long initTime, long expiryTime) { + if (member.getStatus() == MemberStatus.ReadyToShutDown) { + if (initTime == 0){ + // obsolete init time hasn't been set, i.e. not a member detected faulty. + // this is a graceful shutdown + return false; + } + + // member detected faulty, calculate ready to shutdown waiting period + long timeInReadyToShutdownStatus = System.currentTimeMillis() - initTime; + return timeInReadyToShutdownStatus >= expiryTime; + } + + return false; + } + + + /** + * Corrects the member status upon termination call if the member is in an Active state + * + * @param member The {@link org.apache.stratos.messaging.domain.topology.Member} object that is being + * checked for status + * @param topology The {@link org.apache.stratos.messaging.domain.topology.Topology} object to update + * the topology if needed. + * + */ + private boolean fixMemberStatus(Member member, Topology topology) { + if (member.getStatus() == MemberStatus.Activated) { + MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( + member.getServiceName(), + member.getClusterId(), + member.getNetworkPartitionId(), + member.getPartitionId(), + member.getMemberId(), + member.getInstanceId()); + + try { + TopologyManager.acquireWriteLock(); + member.setStatus(MemberStatus.ReadyToShutDown); + LOG.info("Member Ready to shut down event adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + //publishing data + CartridgeInstanceDataPublisher.publish(member.getMemberId(), + member.getPartitionId(), + member.getNetworkPartitionId(), + member.getClusterId(), + member.getServiceName(), + MemberStatus.ReadyToShutDown.toString(), + null); + + return true; + } + + return false; + } + + + private class InstanceTerminator implements Runnable { + + private MemberContext ctxt; + + public InstanceTerminator(MemberContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + String memberId = ctxt.getMemberId(); + String clusterId = ctxt.getClusterId(); + String partitionId = ctxt.getPartition().getId(); + String cartridgeType = ctxt.getCartridgeType(); + String nodeId = ctxt.getNodeId(); + + try { + // these will never be null, since we do not add null values for these. + Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + + LOG.info("Starting to terminate an instance with member id : " + memberId + + " in partition id: " + partitionId + " of cluster id: " + clusterId + + " and of cartridge type: " + cartridgeType); + + if (cartridge == null) { + String msg = + "Termination of Member Id: " + memberId + " failed. " + + "Cannot find a matching Cartridge for type: " + + cartridgeType; + LOG.error(msg); + throw new InvalidCartridgeTypeException(msg); + } + + // if no matching node id can be found. + if (nodeId == null) { + + String msg = + "Termination failed. Cannot find a node id for Member Id: " + + memberId; + + // log information + logTermination(ctxt); + LOG.error(msg); + throw new InvalidMemberException(msg); + } + + IaasProvider iaasProvider = cartridge.getIaasProviderOfPartition(partitionId); + + // terminate it! + terminate(iaasProvider, nodeId, ctxt); + + // log information + logTermination(ctxt); + + } catch (Exception e) { + String msg = + "Instance termination failed. " + ctxt.toString(); + LOG.error(msg, e); + throw new CloudControllerException(msg, e); + } + + } + } + + private class JcloudsInstanceCreator implements Runnable { + + private MemberContext memberContext; + private IaasProvider iaasProvider; + private String cartridgeType; + + public JcloudsInstanceCreator(MemberContext memberContext, IaasProvider iaasProvider, + String cartridgeType) { + this.memberContext = memberContext; + this.iaasProvider = iaasProvider; + this.cartridgeType = cartridgeType; + } + + @Override + public void run() { + + + String clusterId = memberContext.getClusterId(); + Partition partition = memberContext.getPartition(); + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + Iaas iaas = iaasProvider.getIaas(); + String publicIp = null; + + NodeMetadata node = null; + // generate the group id from domain name and sub domain name. + // Should have lower-case ASCII letters, numbers, or dashes. + // Should have a length between 3-15 + String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length()); + String group = str.replaceAll("[^a-z0-9-]", ""); + + try { + ComputeService computeService = iaasProvider + .getComputeService(); + Template template = iaasProvider.getTemplate(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to start an instance for " + + memberContext + " to Jclouds layer."); + } + // create and start a node + Set<? extends NodeMetadata> nodes = computeService + .createNodesInGroup(group, 1, template); + node = nodes.iterator().next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller received a response for the request to start " + + memberContext + " from Jclouds layer."); + } + + if (node == null) { + String msg = "Null response received for instance start-up request to Jclouds.\n" + + memberContext.toString(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + + // node id + String nodeId = node.getId(); + if (nodeId == null) { + String msg = "Node id of the starting instance is null.\n" + + memberContext.toString(); + LOG.fatal(msg); + throw new IllegalStateException(msg); + } + + memberContext.setNodeId(nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Node id was set. " + memberContext.toString()); + } + + // attach volumes + if (ctxt.isVolumeRequired()) { + // remove region prefix + String instanceId = nodeId.indexOf('/') != -1 ? nodeId + .substring(nodeId.indexOf('/') + 1, nodeId.length()) + : nodeId; + memberContext.setInstanceId(instanceId); + if (ctxt.getVolumes() != null) { + for (Volume volume : ctxt.getVolumes()) { + try { + iaas.attachVolume(instanceId, volume.getId(), + volume.getDevice()); + } catch (Exception e) { + // continue without throwing an exception, since + // there is an instance already running + LOG.error("Attaching Volume to Instance [ " + + instanceId + " ] failed!", e); + } + } + } + } + + } catch (Exception e) { + String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("IP allocation process started for " + memberContext); + } + String autoAssignIpProp = + iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY); + + String pre_defined_ip = + iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY); + + // reset ip + String ip = ""; + + // default behavior is autoIpAssign=false + if (autoAssignIpProp == null || + (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) { + + // check if floating ip is well defined in cartridge definition + if (pre_defined_ip != null) { + if (isValidIpAddress(pre_defined_ip)) { + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip); + } + ip = iaas.associatePredefinedAddress(node, pre_defined_ip); + + if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) { + // throw exception and stop instance creation + String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip + + " / allocated ip:" + ip + + " - terminating node:" + memberContext.toString(); + LOG.error(msg); + // terminate instance + terminate(iaasProvider, + node.getId(), memberContext); + throw new CloudControllerException(msg); + } + } else { + String msg = "Invalid floating ip address configured: " + pre_defined_ip + + " - terminating node:" + memberContext.toString(); + LOG.error(msg); + // terminate instance + terminate(iaasProvider, + node.getId(), memberContext); + throw new CloudControllerException(msg); + } + + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, " + + "selecting available one from pool"); + } + // allocate an IP address - manual IP assigning mode + ip = iaas.associateAddress(node); + + if (ip != null) { + memberContext.setAllocatedIpAddress(ip); + if (LOG.isDebugEnabled()) { + LOG.debug("Allocated an ip address: " + + memberContext.toString()); + } else if (LOG.isInfoEnabled()) { + LOG.info("Allocated ip address [ " + memberContext.getAllocatedIpAddress() + + " ] to member with id: " + memberContext.getMemberId()); + } + } + } + + if (ip == null) { + String msg = "No IP address found. IP allocation failed for " + memberContext; + LOG.error(msg); + throw new CloudControllerException(msg); + } + + // build the node with the new ip + node = NodeMetadataBuilder.fromNodeMetadata(node) + .publicAddresses(ImmutableSet.of(ip)).build(); + } + + + // public ip + if (node.getPublicAddresses() != null && + node.getPublicAddresses().iterator().hasNext()) { + ip = node.getPublicAddresses().iterator().next(); + publicIp = ip; + memberContext.setPublicIpAddress(ip); + if (LOG.isDebugEnabled()) { + LOG.debug("Retrieving Public IP Address : " + memberContext.toString()); + } else if (LOG.isInfoEnabled()) { + LOG.info("Retrieving Public IP Address: " + memberContext.getPublicIpAddress() + + ", member id: " + memberContext.getMemberId()); + } + } + + // private IP + if (node.getPrivateAddresses() != null && + node.getPrivateAddresses().iterator().hasNext()) { + ip = node.getPrivateAddresses().iterator().next(); + memberContext.setPrivateIpAddress(ip); + if (LOG.isDebugEnabled()) { + LOG.debug("Retrieving Private IP Address. " + memberContext.toString()); + } else if (LOG.isInfoEnabled()) { + LOG.info("Retrieving Private IP Address: " + memberContext.getPrivateIpAddress() + + ", member id: " + memberContext.getMemberId()); + } + } + + dataHolder.addMemberContext(memberContext); + + // persist in registry + persist(); + + + // trigger topology + TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, + partition.getId(), ip, publicIp, memberContext); + + String memberID = memberContext.getMemberId(); + + // update the topology with the newly spawned member + // publish data + CartridgeInstanceDataPublisher.publish(memberID, + memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + cartridgeType, + MemberStatus.Created.toString(), + node); + if (LOG.isDebugEnabled()) { + LOG.debug("Node details: " + node.toString()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("IP allocation process ended for " + memberContext); + } + + } catch (Exception e) { + String msg = "Error occurred while allocating an ip address. " + memberContext.toString(); + LOG.error(msg, e); + throw new CloudControllerException(msg, e); + } + + + } + } + + private boolean isValidIpAddress(String ip) { + boolean isValid = InetAddresses.isInetAddress(ip); + return isValid; + } + + @Override + public void terminateAllInstances(String clusterId) throws InvalidClusterException { + + LOG.info("Starting to terminate all instances of cluster : " + + clusterId); + + handleNullObject(clusterId, "Instance termination failed. Cluster id is null."); + + List<MemberContext> ctxts = dataHolder.getMemberContextsOfClusterId(clusterId); + + if (ctxts == null) { + String msg = "Instance termination failed. No members found for cluster id: " + clusterId; + LOG.warn(msg); + return; + } + + ThreadExecutor exec = ThreadExecutor.getInstance(); + for (MemberContext memberContext : ctxts) { + exec.execute(new InstanceTerminator(memberContext)); + } + + } + + + /** + * A helper method to terminate an instance. + * + * @param iaasProvider + * @param ctxt + * @param nodeId + * @return will return the IaaSProvider + */ + private IaasProvider terminate(IaasProvider iaasProvider, + String nodeId, MemberContext ctxt) { + Iaas iaas = iaasProvider.getIaas(); + if (iaas == null) { + + try { + iaas = CloudControllerUtil.getIaas(iaasProvider); + } catch (InvalidIaasProviderException e) { + String msg = + "Instance termination failed. " + ctxt.toString() + + ". Cause: Unable to build Iaas of this " + iaasProvider.toString(); + LOG.error(msg, e); + throw new CloudControllerException(msg, e); + } + + } + + //detach volumes if any + detachVolume(iaasProvider, ctxt); + + // destroy the node + iaasProvider.getComputeService().destroyNode(nodeId); + + // release allocated IP address + if (ctxt.getAllocatedIpAddress() != null) { + iaas.releaseAddress(ctxt.getAllocatedIpAddress()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Member is terminated: " + ctxt.toString()); + } else if (LOG.isInfoEnabled()) { + LOG.info("Member with id " + ctxt.getMemberId() + " is terminated"); + } + return iaasProvider; + } + + private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) { + String clusterId = ctxt.getClusterId(); + ClusterContext clusterCtxt = dataHolder.getClusterContext(clusterId); + if (clusterCtxt.getVolumes() != null) { + for (Volume volume : clusterCtxt.getVolumes()) { + try { + String volumeId = volume.getId(); + if (volumeId == null) { + return; + } + Iaas iaas = iaasProvider.getIaas(); + iaas.detachVolume(ctxt.getInstanceId(), volumeId); + } catch (ResourceNotFoundException ignore) { + if (LOG.isDebugEnabled()) { + LOG.debug(ignore); + } + } + } + } + } + + private void logTermination(MemberContext memberContext) { + + if (memberContext == null) { + return; + } + + String partitionId = memberContext.getPartition() == null ? null : memberContext.getPartition().getId(); + + //updating the topology + TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(), + memberContext.getClusterId(), memberContext.getNetworkPartitionId(), + partitionId, memberContext.getMemberId()); + + //publishing data + CartridgeInstanceDataPublisher.publish(memberContext.getMemberId(), + partitionId, + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + memberContext.getCartridgeType(), + MemberStatus.Terminated.toString(), + null); + + // update data holders + dataHolder.removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId()); + + // persist + persist(); + + } + + @Override + public boolean registerService(Registrant registrant) + throws UnregisteredCartridgeException { + + String cartridgeType = registrant.getCartridgeType(); + handleNullObject(cartridgeType, "Service registration failed. Cartridge Type is null."); + + String clusterId = registrant.getClusterId(); + handleNullObject(clusterId, "Service registration failed. Cluster id is null."); + + String payload = registrant.getPayload(); + handleNullObject(payload, "Service registration failed. Payload is null."); + + String hostName = registrant.getHostName(); + handleNullObject(hostName, "Service registration failed. Hostname is null."); + + Cartridge cartridge = null; + if ((cartridge = dataHolder.getCartridge(cartridgeType)) == null) { + + String msg = "Registration of cluster: " + clusterId + + " failed. - Unregistered Cartridge type: " + cartridgeType; + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } + + Properties props = CloudControllerUtil.toJavaUtilProperties(registrant.getProperties()); + String property = props.getProperty(IS_LOAD_BALANCER); + boolean isLb = property != null ? Boolean.parseBoolean(property) : false; + + //TODO fix the properties issue + /*ClusterContext ctxt = buildClusterContext(cartridge, clusterId, + payload, hostName, props, isLb, registrant.getPersistence()); + + + dataHolder.addClusterContext(ctxt);*/ + TopologyBuilder.handleClusterCreated(registrant, isLb); + + persist(); + + LOG.info("Successfully registered: " + registrant); + + return true; + } + + private ClusterContext buildClusterContext(Cartridge cartridge, + String clusterId, String payload, String hostName, + org.apache.stratos.common.Properties props, boolean isLb, Persistence persistence) { + //TODO fix properties issue + // initialize ClusterContext + ClusterContext ctxt = new ClusterContext(clusterId, cartridge.getType(), payload, + hostName, isLb, props); + + /*String property; + property = props.get(Constants.GRACEFUL_SHUTDOWN_TIMEOUT); + long timeout = property != null ? Long.parseLong(property) : 30000; + + boolean persistanceRequired = false; + if(persistence != null){ + persistanceRequired = persistence.isPersistanceRequired(); + } + + if(persistanceRequired){ + ctxt.setVolumes(persistence.getVolumes()); + ctxt.setVolumeRequired(true); + }else{ + ctxt.setVolumeRequired(false); + } + ctxt.setTimeoutInMillis(timeout); + return ctxt; + ;*/ + return null; + } + + @Override + public String[] getRegisteredCartridges() { + // get the list of cartridges registered + List<Cartridge> cartridges = dataHolder + .getCartridges(); + + if (cartridges == null) { + LOG.info("No registered Cartridge found."); + return new String[0]; + } + + String[] cartridgeTypes = new String[cartridges.size()]; + int i = 0; + + if (LOG.isDebugEnabled()) { + LOG.debug("Registered Cartridges : \n"); + } + for (Cartridge cartridge : cartridges) { + if (LOG.isDebugEnabled()) { + LOG.debug(cartridge); + } + cartridgeTypes[i] = cartridge.getType(); + i++; + } + + return cartridgeTypes; + } + + @Override + public CartridgeInfo getCartridgeInfo(String cartridgeType) + throws UnregisteredCartridgeException { + Cartridge cartridge = dataHolder + .getCartridge(cartridgeType); + + if (cartridge != null) { + + return CloudControllerUtil.toCartridgeInfo(cartridge); + + } + + String msg = "Cannot find a Cartridge having a type of " + + cartridgeType + ". Hence unable to find information."; + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } + + @Override + public void unregisterService(String clusterId) throws UnregisteredClusterException { + final String clusterId_ = clusterId; + + ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + + handleNullObject(ctxt, "Service unregistration failed. Invalid cluster id: " + clusterId); + + String cartridgeType = ctxt.getCartridgeType(); + + Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + + if (cartridge == null) { + String msg = + "Service unregistration failed. No matching Cartridge found [type] " + cartridgeType + ". "; + LOG.error(msg); + throw new UnregisteredClusterException(msg); + } + + // if it's a kubernetes cluster + if (StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) { + unregisterDockerService(clusterId_); + + } else { + +// TopologyBuilder.handleClusterMaintenanceMode(dataHolder.getClusterContext(clusterId_)); + + Runnable terminateInTimeout = new Runnable() { + @Override + public void run() { + ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + if (ctxt == null) { + String msg = "Service unregistration failed. Cluster not found: " + clusterId_; + LOG.error(msg); + return; + } + Collection<Member> members = TopologyManager.getTopology(). + getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); + //finding the responding members from the existing members in the topology. + int sizeOfRespondingMembers = 0; + for (Member member : members) { + if (member.getStatus().getCode() >= MemberStatus.Activated.getCode()) { + sizeOfRespondingMembers++; + } + } + + long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * sizeOfRespondingMembers; + while (System.currentTimeMillis() < endTime) { + CloudControllerUtil.sleep(1000); + + } + + // if there're still alive members + if (members.size() > 0) { + //forcefully terminate them + for (Member member : members) { + + try { + terminateInstance(member.getMemberId()); + } catch (Exception e) { + // we are not gonna stop the execution due to errors. + LOG.warn("Instance termination failed of member [id] " + member.getMemberId(), e); + } + } + } + } + }; + Runnable unregister = new Runnable() { + public void run() { + ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + if (ctxt == null) { + String msg = "Service unregistration failed. Cluster not found: " + clusterId_; + LOG.error(msg); + return; + } + Collection<Member> members = TopologyManager.getTopology(). + getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); + // TODO why end time is needed? + // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size(); + + while (members.size() > 0) { + //waiting until all the members got removed from the Topology/ timed out + CloudControllerUtil.sleep(1000); + } + + LOG.info("Unregistration of service cluster: " + clusterId_); + deleteVolumes(ctxt); + onClusterRemoval(clusterId_); + } + + private void deleteVolumes(ClusterContext ctxt) { + if (ctxt.isVolumeRequired()) { + Cartridge cartridge = dataHolder.getCartridge(ctxt.getCartridgeType()); + if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) { + for (Volume volume : ctxt.getVolumes()) { + if (volume.getId() != null) { + String iaasType = volume.getIaasType(); + //Iaas iaas = dataHolder.getIaasProvider(iaasType).getIaas(); + Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas(); + if (iaas != null) { + try { + // delete the volumes if remove on unsubscription is true. + if (volume.isRemoveOntermination()) { + iaas.deleteVolume(volume.getId()); + volume.setId(null); + } + } catch (Exception ignore) { + if (LOG.isErrorEnabled()) { + LOG.error("Error while deleting volume [id] " + volume.getId(), ignore); + } + } + } + } + } + + } + } + } + }; + new Thread(terminateInTimeout).start(); + new Thread(unregister).start(); + } + } + + @Override + public void unregisterDockerService(String clusterId) + throws UnregisteredClusterException { + + // terminate all kubernetes units + try { + terminateAllContainers(clusterId); + } catch (InvalidClusterException e) { + String msg = "Docker instance termination fails for cluster: " + clusterId; + LOG.error(msg, e); + throw new UnregisteredClusterException(msg, e); + } + // send cluster removal notifications and update the state + onClusterRemoval(clusterId); + } + + + @Override + public boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions) + throws InvalidPartitionException, InvalidCartridgeTypeException { + + Map<String, List<String>> validatedCache = dataHolder.getCartridgeTypeToPartitionIds(); + List<String> validatedPartitions = new ArrayList<String>(); + + if (validatedCache.containsKey(cartridgeType)) { + // cache hit for this cartridge + // get list of partitions + validatedPartitions = validatedCache.get(cartridgeType); + if (LOG.isDebugEnabled()) { + LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType); + } + + } + + Map<String, IaasProvider> partitionToIaasProviders = + new ConcurrentHashMap<String, IaasProvider>(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType); + } + + Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + + if (cartridge == null) { + String msg = "Invalid Cartridge Type: " + cartridgeType; + LOG.error(msg); + throw new InvalidCartridgeTypeException(msg); + } + + Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>(); + + for (Partition partition : partitions) { + + if (validatedPartitions.contains(partition.getId())) { + // partition cache hit + continue; + } + + Callable<IaasProvider> worker = new PartitionValidatorCallable( + partition, cartridge); + Future<IaasProvider> job = FasterLookUpDataHolder.getInstance() + .getExecutor().submit(worker); + jobList.put(partition.getId(), job); + } + + // Retrieve the results of the concurrently performed sanity checks. + for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) { + if (entry == null) { + continue; + } + String partitionId = entry.getKey(); + Future<IaasProvider> job = entry.getValue(); + try { + // add to a temporary Map + partitionToIaasProviders.put(partitionId, job.get()); + + // add to cache + this.dataHolder.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId); + + if (LOG.isDebugEnabled()) { + LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new InvalidPartitionException(e.getMessage(), e); + } + } + + // if and only if the deployment policy valid + cartridge.addIaasProviders(partitionToIaasProviders); + + // persist data + persist(); + + LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) + + " were validated successfully, against the Cartridge: " + cartridgeType); + + return true; + } + + private void onClusterRemoval(final String clusterId) { + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + TopologyBuilder.handleClusterRemoved(ctxt); + dataHolder.removeClusterContext(clusterId); + dataHolder.removeMemberContextsOfCluster(clusterId); + persist(); + } + + @Override + public boolean validatePartition(Partition partition) throws InvalidPartitionException { + handleNullObject(partition, "Partition validation failed. Partition is null."); + String provider = partition.getProvider(); + handleNullObject(provider, "Partition [" + partition.getId() + "] validation failed. Partition provider is null."); + IaasProvider iaasProvider = dataHolder.getIaasProvider(provider); + + if (iaasProvider == null) { + String msg = + "Invalid Partition - " + partition.toString() + ". Cause: Iaas Provider " + + "is null for Partition Provider: " + provider; + LOG.error(msg); + throw new InvalidPartitionException(msg); + } + + Iaas iaas = iaasProvider.getIaas(); + + if (iaas == null) { + + try { + iaas = CloudControllerUtil.getIaas(iaasProvider); + } catch (InvalidIaasProviderException e) { + String msg = + "Invalid Partition - " + partition.toString() + + ". Cause: Unable to build Iaas of this IaasProvider [Provider] : " + provider + ". " + e.getMessage(); + LOG.error(msg, e); + throw new InvalidPartitionException(msg, e); + } + + } + + PartitionValidator validator = iaas.getPartitionValidator(); + validator.setIaasProvider(iaasProvider); + validator.validate(partition.getId(), + CloudControllerUtil.toJavaUtilProperties(partition.getProperties())); + + return true; + } + + public ClusterContext getClusterContext(String clusterId) { + + return dataHolder.getClusterContext(clusterId); + } + + @Override + public MemberContext[] startContainers(ContainerClusterContext containerClusterContext) + throws UnregisteredCartridgeException { + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:startContainers"); + } + + handleNullObject(containerClusterContext, "Container start-up failed. ContainerClusterContext is null."); + + String clusterId = containerClusterContext.getClusterId(); + handleNullObject(clusterId, "Container start-up failed. Cluster id is null."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Received a container spawn request : " + containerClusterContext.toString()); + } + + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString()); + + String cartridgeType = ctxt.getCartridgeType(); + + Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + + if (cartridge == null) { + String msg = + "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " + + containerClusterContext.toString(); + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } + + try { + String minReplicas = validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt); + String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); + String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext); + String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext); + + KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, kubernetesPortRange); + + KubernetesApiClient kubApi = kubClusterContext.getKubApi(); + + // first let's create a replication controller. + ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController(); + ReplicationController controller = controllerFunction.apply(containerClusterContext); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to start a replication controller " + controller + + " for " + containerClusterContext + " to Kubernetes layer."); + } + + kubApi.createReplicationController(controller); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller successfully started the controller " + + controller + " via Kubernetes layer."); + } + + // secondly let's create a kubernetes service proxy to load balance these containers + ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService(); + Service service = serviceFunction.apply(containerClusterContext); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to start a service " + service + + " for " + containerClusterContext + " to Kubernetes layer."); + } + + kubApi.createService(service); + + // set host port and update + Property allocatedServiceHostPortProp = new Property(); + allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort())); + ctxt.getProperties().addProperty(allocatedServiceHostPortProp); + dataHolder.addClusterContext(ctxt); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller successfully started the service " + + controller + " via Kubernetes layer."); + } + + // create a label query + Label l = new Label(); + l.setName(clusterId); + // execute the label query + Pod[] newlyCreatedPods = new Pod[0]; + int expectedCount = Integer.parseInt(minReplicas); + + for (int i = 0; i < expectedCount; i++) { + newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l}); + + if (LOG.isDebugEnabled()) { + + LOG.debug("Pods Count: " + newlyCreatedPods.length + " for cluster: " + clusterId); + } + if (newlyCreatedPods.length == expectedCount) { + break; + } + Thread.sleep(10000); + } + + if (newlyCreatedPods.length == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Pods are not created for cluster : %s, hence deleting the service", clusterId)); + } + terminateAllContainers(clusterId); + return new MemberContext[0]; + } + + if (LOG.isDebugEnabled()) { + + LOG.debug(String.format("Pods created : %s for cluster : %s", newlyCreatedPods.length, clusterId)); + } + + List<MemberContext> memberContexts = new ArrayList<MemberContext>(); + + PodToMemberContext podToMemberContextFunc = new PodToMemberContext(); + // generate Member Contexts + for (Pod pod : newlyCreatedPods) { + MemberContext context = podToMemberContextFunc.apply(pod); + context.setCartridgeType(cartridgeType); + context.setClusterId(clusterId); + + context.setProperties(CloudControllerUtil.addProperty(context + .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, + String.valueOf(service.getPort()))); + + dataHolder.addMemberContext(context); + + // wait till Pod status turns to running and send member spawned. + ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is starting the instance start up thread."); + } + dataHolder.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); + + memberContexts.add(context); + } + + // persist in registry + persist(); + + LOG.info("Kubernetes entities are successfully starting up: " + memberContexts); + + return memberContexts.toArray(new MemberContext[0]); + + } catch (Exception e) { + String msg = "Failed to start an instance. " + containerClusterContext.toString() + " Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); + } + } + + private String validateProperty(String property, ClusterContext ctxt) { + + String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), property); + handleNullObject(propVal, "Property validation failed. Cannot find '" + property + "' in " + ctxt); + return propVal; + } + + private String validateProperty(String property, ContainerClusterContext ctxt) { + + String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), property); + handleNullObject(propVal, "Property validation failed. '" + property + "' in " + ctxt); + return propVal; + + } + + private KubernetesClusterContext getKubernetesClusterContext( + String kubernetesClusterId, String kubernetesMasterIp, + String kubernetesPortRange) { + + KubernetesClusterContext origCtxt = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + KubernetesClusterContext newCtxt = new KubernetesClusterContext(kubernetesClusterId, kubernetesPortRange, kubernetesMasterIp); + + if (origCtxt == null) { + dataHolder.addKubernetesClusterContext(newCtxt); + return newCtxt; + } + + if (!origCtxt.equals(newCtxt)) { + // if for some reason master IP etc. have changed + newCtxt.setAvailableHostPorts(origCtxt.getAvailableHostPorts()); + dataHolder.addKubernetesClusterContext(newCtxt); + return newCtxt; + } else { + return origCtxt; + } + } + + @Override + public MemberContext[] terminateAllContainers(String clusterId) + throws InvalidClusterException { + + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId); + + String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.KUBERNETES_CLUSTER_ID); + handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" + + StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt); + + KubernetesClusterContext kubClusterContext = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: " + + kubernetesClusterId); + + KubernetesApiClient kubApi = kubClusterContext.getKubApi(); + // delete the service + try { + kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId)); + } catch (KubernetesClientException e) { + // we're not going to throw this error, but proceed with other deletions + LOG.error("Failed to delete Kubernetes service with id: " + clusterId, e); + } + + // set replicas=0 for the replication controller + try { + kubApi.updateReplicationController(clusterId, 0); + } catch (KubernetesClientException e) { + // we're not going to throw this error, but proceed with other deletions + LOG.error("Failed to update Kubernetes Controller with id: " + clusterId, e); + } + + // delete pods forcefully + try { + // create a label query + Label l = new Label(); + l.setName(clusterId); + // execute the label query + Pod[] pods = kubApi.getSelectedPods(new Label[]{l}); + + for (Pod pod : pods) { + try { + // delete pods forcefully + kubApi.deletePod(pod.getId()); + } catch (KubernetesClientException ignore) { + // we can't do nothing here + LOG.warn(String.format("Failed to delete Pod [%s] forcefully!", pod.getId())); + } + } + } catch (KubernetesClientException e) { + // we're not going to throw this error, but proceed with other deletions + LOG.error("Failed to delete pods forcefully for cluster: " + clusterId, e); + } + + // delete the replication controller. + try { + kubApi.deleteReplicationController(clusterId); + } catch (KubernetesClientException e) { + String msg = "Failed to delete Kubernetes Controller with id: " + clusterId; + LOG.error(msg, e); + throw new InvalidClusterException(msg, e); + } + + String allocatedPort = CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + + if (allocatedPort != null) { + kubClusterContext.deallocateHostPort(Integer + .parseInt(allocatedPort)); + } else { + LOG.warn("Host port dealloacation failed due to a missing property: " + + StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + } + + List<MemberContext> membersToBeRemoved = dataHolder.getMemberContextsOfClusterId(clusterId); + + for (MemberContext memberContext : membersToBeRemoved) { + logTermination(memberContext); + } + + // persist + persist(); + + return membersToBeRemoved.toArray(new MemberContext[0]); + } + + @Override + public MemberContext[] updateContainers(String clusterId, int replicas) + throws UnregisteredCartridgeException { + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId); + } + + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId); + + String cartridgeType = ctxt.getCartridgeType(); + + Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + + if (cartridge == null) { + String msg = + "Container update failed. No matching Cartridge found [type] " + cartridgeType + + ". [cluster id] " + clusterId; + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } + + try { + String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); + + KubernetesClusterContext kubClusterContext = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + + if (kubClusterContext == null) { + String msg = + "Instance start-up failed. No matching Kubernetes Context Found for [id] " + kubernetesClusterId + + ". [cluster id] " + clusterId; + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } + + KubernetesApiClient kubApi = kubClusterContext.getKubApi(); + // create a label query + Label l = new Label(); + l.setName(clusterId); + + // get the current pods - useful when scale down + Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l}); + + // update the replication controller - cluster id = replication controller id + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to update a replication controller " + clusterId + + " to Kubernetes layer."); + } + + kubApi.updateReplicationController(clusterId, replicas); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller successfully updated the controller " + + clusterId + " via Kubernetes layer."); + } + + // execute the label query + Pod[] allPods = new Pod[0]; + + // wait replicas*5s time in the worst case ; best case = 0s + for (int i = 0; i < (replicas * previousStatePods.length + 1); i++) { + allPods = kubApi.getSelectedPods(new Label[]{l}); + + if (LOG.isDebugEnabled()) { + + LOG.debug("Pods Count: " + allPods.length + " for cluster: " + clusterId); + } + if (allPods.length == replicas) { + break; + } + Thread.sleep(10000); + } + + if (LOG.isDebugEnabled()) { + + LOG.debug(String.format("Pods created : %s for cluster : %s", allPods.length, clusterId)); + } + + List<MemberContext> memberContexts = new ArrayList<MemberContext>(); + + PodToMemberContext podToMemberContextFunc = new PodToMemberContext(); + // generate Member Contexts + for (Pod pod : allPods) { + MemberContext context; + // if member context does not exist -> a new member (scale up) + if ((context = dataHolder.getMemberContextOfMemberId(pod.getId())) == null) { + + context = podToMemberContextFunc.apply(pod); + context.setCartridgeType(cartridgeType); + context.setClusterId(clusterId); + + context.setProperties(CloudControllerUtil.addProperty(context + .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, + CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.ALLOCATED_SERVICE_HOST_PORT))); + + // wait till Pod status turns to running and send member spawned. + ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is starting the instance start up thread."); + } + dataHolder.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); + + memberContexts.add(context); + + } + // publish data + // TODO +// CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node); + + } + + if (memberContexts.isEmpty()) { + // terminated members + @SuppressWarnings("unchecked") + List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods)); + for (Pod pod : difference) { + if (pod != null) { + MemberContext context = dataHolder.getMemberContextOfMemberId(pod.getId()); + logTermination(context); + memberContexts.add(context); + } + } + } + + + // persist in registry + persist(); + + LOG.info("Kubernetes entities are successfully starting up. " + memberContexts); + + return memberContexts.toArray(new MemberContext[0]); + + } catch (Exception e) { + String msg = "Failed to update containers belong to cluster " + clusterId + ". Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); + } + } + + @Override + public void updateClusterStatus(String serviceName, String clusterId, String instanceId, ClusterStatus status) { + //TODO + } + + @Override + public MemberContext terminateContainer(String memberId) throws MemberTerminationFailedException { + + handleNullObject(memberId, "Failed to terminate member. Invalid Member id. [Member id] " + memberId); + + MemberContext memberContext = dataHolder.getMemberContextOfMemberId(memberId); + + handleNullObject(memberContext, "Failed to terminate member. Member id not found. [Member id] " + memberId); + + String clusterId = memberContext.getClusterId(); + + handleNullObject(clusterId, "Failed to terminate member. Cluster id is null. [Member id] " + memberId); + + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + + handleNullObject(ctxt, + String.format("Failed to terminate member [Member id] %s. Invalid cluster id %s ", memberId, clusterId)); + + String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.KUBERNETES_CLUSTER_ID); + + handleNullObject(kubernetesClusterId, String.format("Failed to terminate member [Member id] %s. Cannot find '" + + StratosConstants.KUBERNETES_CLUSTER_ID + "' in [cluster context] %s ", memberId, ctxt)); + + KubernetesClusterContext kubClusterContext = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + + handleNullObject(kubClusterContext, String.format("Failed to terminate member [Member id] %s. Cannot find a matching Kubernetes Cluster in [cluster context] %s ", memberId, ctxt)); + + KubernetesApiClient kubApi = kubClusterContext.getKubApi(); + // delete the Pod + try { + // member id = pod id + kubApi.deletePod(memberId); + + MemberContext memberToBeRemoved = dataHolder.getMemberContextOfMemberId(memberId); + + logTermination(memberToBeRemoved); + + return memberToBeRemoved; + + } catch (KubernetesClientException e) { + String msg = String.format("Failed to terminate member [Member id] %s", memberId); + LOG.error(msg, e); + throw new MemberTerminationFailedException(msg, e); + } + } + + private void handleNullObject(Object obj, String errorMsg) { + if (obj == null) { + LOG.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + } + + @Override + public void createApplicationClusters(String appId, ApplicationClusterContextDTO[] appClustersContexts) throws + ApplicationClusterRegistrationException { + + // Create a Cluster Context obj. for each of the Clusters in the Application + if (appClustersContexts == null || appClustersContexts.length == 0) { + String errorMsg = "No application cluster information found, unable to create clusters"; + LOG.error(errorMsg); + throw new ApplicationClusterRegistrationException(errorMsg); + } + List<Cluster> clusters = new ArrayList<Cluster>(); + + + for (ApplicationClusterContextDTO appClusterCtxt : appClustersContexts) { + dataHolder.addClusterContext(new ClusterContext(appClusterCtxt.getClusterId(), + appClusterCtxt.getCartridgeType(), appClusterCtxt.getTextPayload(), + appClusterCtxt.getHostName(), appClusterCtxt.isLbCluster(), appClusterCtxt.getProperties())); + // create Cluster objects + Cluster newCluster = new Cluster(appClusterCtxt.getCartridgeType(), appClusterCtxt.getClusterId(), + appClusterCtxt.getDeploymentPolicyName(), appClusterCtxt.getA
<TRUNCATED>
