http://git-wip-us.apache.org/repos/asf/stratos/blob/d45448f3/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java index 21878ad..d7d9400 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java @@ -20,7 +20,6 @@ package org.apache.stratos.cloud.controller.impl; import com.google.common.collect.ImmutableSet; import com.google.common.net.InetAddresses; - import org.apache.commons.collections.ListUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -54,6 +53,8 @@ import org.apache.stratos.kubernetes.client.model.Label; import org.apache.stratos.kubernetes.client.model.Pod; import org.apache.stratos.kubernetes.client.model.ReplicationController; import org.apache.stratos.kubernetes.client.model.Service; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.ClusterStatus; @@ -68,7 +69,6 @@ import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.*; import java.util.Map.Entry; -import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -76,65 +76,64 @@ import java.util.concurrent.Future; /** * Cloud Controller Service is responsible for starting up new server instances, * terminating already started instances, providing pending instance count etc. - * */ public class CloudControllerServiceImpl implements CloudControllerService { - private static final Log LOG = LogFactory - .getLog(CloudControllerServiceImpl.class); - private FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder - .getInstance(); - - public CloudControllerServiceImpl() { - // acquire serialized data from registry - acquireData(); - } - - private void acquireData() { - - Object obj = RegistryManager.getInstance().retrieve(); - if (obj != null) { - try { - Object dataObj = Deserializer - .deserializeFromByteArray((byte[]) obj); - if (dataObj instanceof FasterLookUpDataHolder) { - FasterLookUpDataHolder serializedObj = (FasterLookUpDataHolder) dataObj; - FasterLookUpDataHolder currentData = FasterLookUpDataHolder - .getInstance(); - - // assign necessary data - currentData.setClusterIdToContext(serializedObj.getClusterIdToContext()); - currentData.setMemberIdToContext(serializedObj.getMemberIdToContext()); - currentData.setClusterIdToMemberContext(serializedObj.getClusterIdToMemberContext()); - currentData.setCartridges(serializedObj.getCartridges()); - currentData.setKubClusterIdToKubClusterContext(serializedObj.getKubClusterIdToKubClusterContext()); - currentData.setServiceGroups(serializedObj.getServiceGroups()); - - if(LOG.isDebugEnabled()) { - - LOG.debug("Cloud Controller Data is retrieved from registry."); - } - } else { - if(LOG.isDebugEnabled()) { - - LOG.debug("Cloud Controller Data cannot be found in registry."); - } - } - } catch (Exception e) { - - String msg = "Unable to acquire data from Registry. Hence, any historical data will not get reflected."; - LOG.warn(msg, e); - } - - } - } - - public void deployCartridgeDefinition(CartridgeConfig cartridgeConfig) throws InvalidCartridgeDefinitionException, - InvalidIaasProviderException { - + private static final Log LOG = LogFactory + .getLog(CloudControllerServiceImpl.class); + private FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder + .getInstance(); + + public CloudControllerServiceImpl() { + // acquire serialized data from registry + acquireData(); + } + + private void acquireData() { + + Object obj = RegistryManager.getInstance().retrieve(); + if (obj != null) { + try { + Object dataObj = Deserializer + .deserializeFromByteArray((byte[]) obj); + if (dataObj instanceof FasterLookUpDataHolder) { + FasterLookUpDataHolder serializedObj = (FasterLookUpDataHolder) dataObj; + FasterLookUpDataHolder currentData = FasterLookUpDataHolder + .getInstance(); + + // assign necessary data + currentData.setClusterIdToContext(serializedObj.getClusterIdToContext()); + currentData.setMemberIdToContext(serializedObj.getMemberIdToContext()); + currentData.setClusterIdToMemberContext(serializedObj.getClusterIdToMemberContext()); + currentData.setCartridges(serializedObj.getCartridges()); + currentData.setKubClusterIdToKubClusterContext(serializedObj.getKubClusterIdToKubClusterContext()); + currentData.setServiceGroups(serializedObj.getServiceGroups()); + + if (LOG.isDebugEnabled()) { + + LOG.debug("Cloud Controller Data is retrieved from registry."); + } + } else { + if (LOG.isDebugEnabled()) { + + LOG.debug("Cloud Controller Data cannot be found in registry."); + } + } + } catch (Exception e) { + + String msg = "Unable to acquire data from Registry. Hence, any historical data will not get reflected."; + LOG.warn(msg, e); + } + + } + } + + public void deployCartridgeDefinition(CartridgeConfig cartridgeConfig) throws InvalidCartridgeDefinitionException, + InvalidIaasProviderException { + handleNullObject(cartridgeConfig, "Invalid Cartridge Definition: Definition is null."); - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug("Cartridge definition: " + cartridgeConfig.toString()); } @@ -144,16 +143,16 @@ public class CloudControllerServiceImpl implements CloudControllerService { cartridge = CloudControllerUtil.toCartridge(cartridgeConfig); } catch (Exception e) { String msg = - "Invalid Cartridge Definition: Cartridge Type: " + - cartridgeConfig.getType()+ - ". Cause: Cannot instantiate a Cartridge Instance with the given Config. "+e.getMessage(); + "Invalid Cartridge Definition: Cartridge Type: " + + cartridgeConfig.getType() + + ". Cause: Cannot instantiate a Cartridge Instance with the given Config. " + e.getMessage(); LOG.error(msg, e); throw new InvalidCartridgeDefinitionException(msg, e); } List<IaasProvider> iaases = cartridge.getIaases(); - - if (!StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) { + + if (!StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) { if (iaases == null || iaases.isEmpty()) { String msg = "Invalid Cartridge Definition: Cartridge Type: " + cartridgeConfig.getType() @@ -177,19 +176,19 @@ public class CloudControllerServiceImpl implements CloudControllerService { // TODO transaction begins String cartridgeType = cartridge.getType(); - if(dataHolder.getCartridge(cartridgeType) != null) { - Cartridge cartridgeToBeRemoved = dataHolder.getCartridge(cartridgeType); - // undeploy + if (dataHolder.getCartridge(cartridgeType) != null) { + Cartridge cartridgeToBeRemoved = dataHolder.getCartridge(cartridgeType); + // undeploy try { - undeployCartridgeDefinition(cartridgeToBeRemoved.getType()); - } catch (InvalidCartridgeTypeException e) { - //ignore - } + undeployCartridgeDefinition(cartridgeToBeRemoved.getType()); + } catch (InvalidCartridgeTypeException e) { + //ignore + } populateNewCartridge(cartridge, cartridgeToBeRemoved); } - + dataHolder.addCartridge(cartridge); - + // persist persist(); @@ -198,211 +197,211 @@ public class CloudControllerServiceImpl implements CloudControllerService { TopologyBuilder.handleServiceCreated(cartridgeList); // transaction ends - + LOG.info("Successfully deployed the Cartridge definition: " + cartridgeType); } private void populateNewCartridge(Cartridge cartridge, - Cartridge cartridgeToBeRemoved) { - - List<IaasProvider> newIaasProviders = cartridge.getIaases(); - Map<String, IaasProvider> oldPartitionToIaasMap = cartridgeToBeRemoved.getPartitionToIaasProvider(); - - for (Entry<String, IaasProvider> entry : oldPartitionToIaasMap.entrySet()) { - if (entry == null) { - continue; - } - String partitionId = entry.getKey(); - IaasProvider oldIaasProvider = entry.getValue(); - if (newIaasProviders.contains(oldIaasProvider)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Copying a partition from the Cartridge that is undeployed, to the new Cartridge. " - + "[partition id] : "+partitionId+" [cartridge type] "+cartridge.getType() ); - } - cartridge.addIaasProvider(partitionId, newIaasProviders.get(newIaasProviders.indexOf(oldIaasProvider))); - } - } - - } + Cartridge cartridgeToBeRemoved) { - public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException { + List<IaasProvider> newIaasProviders = cartridge.getIaases(); + Map<String, IaasProvider> oldPartitionToIaasMap = cartridgeToBeRemoved.getPartitionToIaasProvider(); + + for (Entry<String, IaasProvider> entry : oldPartitionToIaasMap.entrySet()) { + if (entry == null) { + continue; + } + String partitionId = entry.getKey(); + IaasProvider oldIaasProvider = entry.getValue(); + if (newIaasProviders.contains(oldIaasProvider)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Copying a partition from the Cartridge that is undeployed, to the new Cartridge. " + + "[partition id] : " + partitionId + " [cartridge type] " + cartridge.getType()); + } + cartridge.addIaasProvider(partitionId, newIaasProviders.get(newIaasProviders.indexOf(oldIaasProvider))); + } + } + + } + + public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException { Cartridge cartridge = null; - if((cartridge = dataHolder.getCartridge(cartridgeType)) != null) { + if ((cartridge = dataHolder.getCartridge(cartridgeType)) != null) { if (dataHolder.getCartridges().remove(cartridge)) { - // invalidate partition validation cache - dataHolder.removeFromCartridgeTypeToPartitionIds(cartridgeType); - - if (LOG.isDebugEnabled()) { - LOG.debug("Partition cache invalidated for cartridge "+cartridgeType); - } - + // invalidate partition validation cache + dataHolder.removeFromCartridgeTypeToPartitionIds(cartridgeType); + + if (LOG.isDebugEnabled()) { + LOG.debug("Partition cache invalidated for cartridge " + cartridgeType); + } + persist(); - + // sends the service removed event List<Cartridge> cartridgeList = new ArrayList<Cartridge>(); cartridgeList.add(cartridge); TopologyBuilder.handleServiceRemoved(cartridgeList); - - if(LOG.isInfoEnabled()) { + + if (LOG.isInfoEnabled()) { LOG.info("Successfully undeployed the Cartridge definition: " + cartridgeType); } return; } } - String msg = "Cartridge [type] "+cartridgeType+" is not a deployed Cartridge type."; + String msg = "Cartridge [type] " + cartridgeType + " is not a deployed Cartridge type."; LOG.error(msg); throw new InvalidCartridgeTypeException(msg); } - + public void deployServiceGroup(ServiceGroup servicegroup) throws InvalidServiceGroupException { - - if (servicegroup == null) { + + if (servicegroup == null) { String msg = "Invalid ServiceGroup Definition: Definition is null."; LOG.error(msg); throw new IllegalArgumentException(msg); } - - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("CloudControllerServiceImpl:deployServiceGroup:" + servicegroup.getName()); } - - String [] subGroups = servicegroup.getCartridges(); - - if(LOG.isDebugEnabled()) { + String[] subGroups = servicegroup.getCartridges(); + + + if (LOG.isDebugEnabled()) { LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups" + subGroups); if (subGroups != null) { - LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups:size" + subGroups.length); + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups:size" + subGroups.length); } else { - LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups: is null"); + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups: is null"); } } - - - Dependencies dependencies = servicegroup.getDependencies(); - - if(LOG.isDebugEnabled()) { + + + Dependencies dependencies = servicegroup.getDependencies(); + + if (LOG.isDebugEnabled()) { LOG.debug("CloudControllerServiceImpl:deployServiceGroup:dependencies" + dependencies); } - - if (dependencies != null) { - String [] startupOrders = dependencies.getStartupOrders(); - - if(LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrders" + startupOrders); - - if (startupOrders != null) { - LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder:size" + startupOrders.length); - } else { - LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder: is null"); - } - } - } - - dataHolder.addServiceGroup(servicegroup); - - this.persist(); - + + if (dependencies != null) { + String[] startupOrders = dependencies.getStartupOrders(); + + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrders" + startupOrders); + + if (startupOrders != null) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder:size" + startupOrders.length); + } else { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder: is null"); + } + } + } + + dataHolder.addServiceGroup(servicegroup); + + this.persist(); + } - + public void undeployServiceGroup(String name) throws InvalidServiceGroupException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("CloudControllerServiceImpl:undeployServiceGroup: " + name); } - + ServiceGroup serviceGroup = null; - + serviceGroup = dataHolder.getServiceGroup(name); - - if (serviceGroup != null) { + + if (serviceGroup != null) { if (dataHolder.getServiceGroups().remove(serviceGroup)) { persist(); - if(LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled()) { LOG.info("Successfully undeployed the Service Group definition: " + serviceGroup); } return; } - } - + } + String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; LOG.error(msg); throw new InvalidServiceGroupException(msg); - + } - + @Override - public ServiceGroup getServiceGroup (String name) throws InvalidServiceGroupException { - - if(LOG.isDebugEnabled()) { + public ServiceGroup getServiceGroup(String name) throws InvalidServiceGroupException { + + if (LOG.isDebugEnabled()) { LOG.debug("getServiceGroupDefinition:" + name); } - - ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name); - - if (serviceGroup == null) { - if(LOG.isDebugEnabled()) { + + ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name); + + if (serviceGroup == null) { + if (LOG.isDebugEnabled()) { LOG.debug("getServiceGroupDefinition: no entry found for service group " + name); } - String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; - throw new InvalidServiceGroupException(msg); - } - - return serviceGroup; + String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; + throw new InvalidServiceGroupException(msg); + } + + return serviceGroup; } - - public String [] getServiceGroupSubGroups (String name) throws InvalidServiceGroupException { - ServiceGroup serviceGroup = this.getServiceGroup(name); - if (serviceGroup == null) { - throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); - } - - return serviceGroup.getSubGroups(); + + public String[] getServiceGroupSubGroups(String name) throws InvalidServiceGroupException { + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + + return serviceGroup.getSubGroups(); } - + /** - * + * */ - public String [] getServiceGroupCartridges (String name) throws InvalidServiceGroupException { - ServiceGroup serviceGroup = this.getServiceGroup(name); - if (serviceGroup == null) { - throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); - } - String [] cs = serviceGroup.getCartridges(); - return cs; - + public String[] getServiceGroupCartridges(String name) throws InvalidServiceGroupException { + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + String[] cs = serviceGroup.getCartridges(); + return cs; + } - - public Dependencies getServiceGroupDependencies (String name) throws InvalidServiceGroupException { - ServiceGroup serviceGroup = this.getServiceGroup(name); - if (serviceGroup == null) { - throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); - } - return serviceGroup.getDependencies(); + + public Dependencies getServiceGroupDependencies(String name) throws InvalidServiceGroupException { + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + return serviceGroup.getDependencies(); } @Override public MemberContext startInstance(MemberContext memberContext) throws - UnregisteredCartridgeException, InvalidIaasProviderException { + UnregisteredCartridgeException, InvalidIaasProviderException { - if(LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:startInstance"); - } + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:startInstance"); + } - handleNullObject(memberContext, "Instance start-up failed. Member is null."); + handleNullObject(memberContext, "Instance start-up failed. Member is null."); String clusterId = memberContext.getClusterId(); Partition partition = memberContext.getPartition(); - if(LOG.isDebugEnabled()) { - LOG.debug("Received an instance spawn request : " + memberContext); + if (LOG.isDebugEnabled()) { + LOG.debug("Received an instance spawn request : " + memberContext); } Template template = null; handleNullObject(partition, "Instance start-up failed. Specified Partition is null. " + - memberContext); + memberContext); String partitionId = partition.getId(); ClusterContext ctxt = dataHolder.getClusterContext(clusterId); @@ -415,8 +414,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { if (cartridge == null) { String msg = - "Instance start-up failed. No matching Cartridge found [type] "+cartridgeType +". "+ - memberContext.toString(); + "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " + + memberContext.toString(); LOG.error(msg); throw new UnregisteredCartridgeException(msg); } @@ -426,20 +425,20 @@ public class CloudControllerServiceImpl implements CloudControllerService { IaasProvider iaasProvider = cartridge.getIaasProviderOfPartition(partitionId); if (iaasProvider == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("IaasToPartitionMap "+cartridge.hashCode() - + " for cartridge "+cartridgeType+ " and for partition: "+partitionId); - } - String msg = "Instance start-up failed. " - + "There's no IaaS provided for the partition: " - + partitionId - + " and for the Cartridge type: " - + cartridgeType - + ". Only following " - + "partitions can be found in this Cartridge: " - + cartridge.getPartitionToIaasProvider().keySet() - .toString() + ". " + memberContext.toString() - + ". "; + if (LOG.isDebugEnabled()) { + LOG.debug("IaasToPartitionMap " + cartridge.hashCode() + + " for cartridge " + cartridgeType + " and for partition: " + partitionId); + } + String msg = "Instance start-up failed. " + + "There's no IaaS provided for the partition: " + + partitionId + + " and for the Cartridge type: " + + cartridgeType + + ". Only following " + + "partitions can be found in this Cartridge: " + + cartridge.getPartitionToIaasProvider().keySet() + .toString() + ". " + memberContext.toString() + + ". "; LOG.fatal(msg); throw new InvalidIaasProviderException(msg); } @@ -454,8 +453,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { addToPayload(payload, "LB_CLUSTER_ID", memberContext.getLbClusterId()); addToPayload(payload, "NETWORK_PARTITION_ID", memberContext.getNetworkPartitionId()); addToPayload(payload, "PARTITION_ID", partitionId); - if(memberContext.getProperties() != null) { - org.apache.stratos.common.Properties properties = memberContext.getProperties(); + if (memberContext.getProperties() != null) { + org.apache.stratos.common.Properties properties = memberContext.getProperties(); if (properties != null) { for (Property prop : properties.getProperties()) { addToPayload(payload, prop.getName(), prop.getValue()); @@ -464,27 +463,27 @@ public class CloudControllerServiceImpl implements CloudControllerService { } Iaas iaas = iaasProvider.getIaas(); - + if (LOG.isDebugEnabled()) { LOG.debug("Payload: " + payload.toString()); } - + if (iaas == null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Iaas is null of Iaas Provider: "+type+". Trying to build IaaS..."); + if (LOG.isDebugEnabled()) { + LOG.debug("Iaas is null of Iaas Provider: " + type + ". Trying to build IaaS..."); } try { iaas = CloudControllerUtil.getIaas(iaasProvider); } catch (InvalidIaasProviderException e) { - String msg ="Instance start up failed. "+memberContext.toString()+ - "Unable to build Iaas of this IaasProvider [Provider] : " + type+". Cause: "+e.getMessage(); + String msg = "Instance start up failed. " + memberContext.toString() + + "Unable to build Iaas of this IaasProvider [Provider] : " + type + ". Cause: " + e.getMessage(); LOG.error(msg, e); throw new InvalidIaasProviderException(msg, e); } - + } - if(ctxt.isVolumeRequired()) { + if (ctxt.isVolumeRequired()) { if (ctxt.getVolumes() != null) { for (Volume volume : ctxt.getVolumes()) { @@ -496,19 +495,19 @@ public class CloudControllerServiceImpl implements CloudControllerService { } } - if(ctxt.isVolumeRequired()){ + if (ctxt.isVolumeRequired()) { addToPayload(payload, "PERSISTENCE_MAPPING", getPersistencePayload(ctxt, iaas).toString()); } iaasProvider.setPayload(payload.toString().getBytes()); iaas.setDynamicPayload(); template = iaasProvider.getTemplate(); - + if (template == null) { String msg = - "Failed to start an instance. " + - memberContext.toString() + - ". Reason : Jclouds Template is null for iaas provider [type]: "+iaasProvider.getType(); + "Failed to start an instance. " + + memberContext.toString() + + ". Reason : Jclouds Template is null for iaas provider [type]: " + iaasProvider.getType(); LOG.error(msg); throw new InvalidIaasProviderException(msg); } @@ -516,115 +515,114 @@ public class CloudControllerServiceImpl implements CloudControllerService { //Start instance start up in a new thread ThreadExecutor exec = ThreadExecutor.getInstance(); if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is starting the instance start up thread."); - } + LOG.debug("Cloud Controller is starting the instance start up thread."); + } exec.execute(new JcloudsInstanceCreator(memberContext, iaasProvider, cartridgeType)); - LOG.info("Instance is successfully starting up. "+memberContext.toString()); + LOG.info("Instance is successfully starting up. " + memberContext.toString()); return memberContext; } catch (Exception e) { - String msg = "Failed to start an instance. " + memberContext.toString()+" Cause: "+e.getMessage(); + String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage(); LOG.error(msg, e); throw new IllegalStateException(msg, e); } } - private void createVolumeAndSetInClusterContext(Volume volume, - IaasProvider iaasProvider) { - // iaas cannot be null at this state #startInstance method - Iaas iaas = iaasProvider.getIaas(); - int sizeGB = volume.getSize(); - String snapshotId = volume.getSnapshotId(); - if(StringUtils.isNotEmpty(volume.getVolumeId())){ + private void createVolumeAndSetInClusterContext(Volume volume, + IaasProvider iaasProvider) { + // iaas cannot be null at this state #startInstance method + Iaas iaas = iaasProvider.getIaas(); + int sizeGB = volume.getSize(); + String snapshotId = volume.getSnapshotId(); + if (StringUtils.isNotEmpty(volume.getVolumeId())) { // volumeID is specified, so not creating additional volumes - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug("Volume creation is skipping since a volume ID is specified. [Volume ID]" + volume.getVolumeId()); } volume.setId(volume.getVolumeId()); - }else{ + } else { String volumeId = iaas.createVolume(sizeGB, snapshotId); volume.setId(volumeId); } - - volume.setIaasType(iaasProvider.getType()); - } + + volume.setIaasType(iaasProvider.getType()); + } private StringBuilder getPersistencePayload(ClusterContext ctx, Iaas iaas) { - StringBuilder persistencePayload = new StringBuilder(); - if(isPersistenceMappingAvailable(ctx)){ - for(Volume volume : ctx.getVolumes()){ - if(LOG.isDebugEnabled()){ - LOG.debug("Adding persistence mapping " + volume.toString()); - } - if(persistencePayload.length() != 0) { - persistencePayload.append("|"); + StringBuilder persistencePayload = new StringBuilder(); + if (isPersistenceMappingAvailable(ctx)) { + for (Volume volume : ctx.getVolumes()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding persistence mapping " + volume.toString()); + } + if (persistencePayload.length() != 0) { + persistencePayload.append("|"); } - - persistencePayload.append(iaas.getIaasDevice(volume.getDevice())); - persistencePayload.append("|"); + + persistencePayload.append(iaas.getIaasDevice(volume.getDevice())); + persistencePayload.append("|"); persistencePayload.append(volume.getId()); persistencePayload.append("|"); persistencePayload.append(volume.getMappingPath()); - } - } - if(LOG.isDebugEnabled()){ + } + } + if (LOG.isDebugEnabled()) { LOG.debug("Persistence payload is" + persistencePayload.toString()); } - return persistencePayload; - } + return persistencePayload; + } - private boolean isPersistenceMappingAvailable(ClusterContext ctx) { - return ctx.getVolumes() != null && ctx.isVolumeRequired(); - } + private boolean isPersistenceMappingAvailable(ClusterContext ctx) { + return ctx.getVolumes() != null && ctx.isVolumeRequired(); + } - private void addToPayload(StringBuilder payload, String name, String value) { - payload.append(","); - payload.append(name+"=" + value); + private void addToPayload(StringBuilder payload, String name, String value) { + payload.append(","); + payload.append(name + "=" + value); } /** - * Persist data in registry. - */ - private void persist() { - try { - RegistryManager.getInstance().persist( - dataHolder); - } catch (RegistryException e) { - - String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; - LOG.fatal(msg); - throw new CloudControllerException(msg, e); - } - } + * Persist data in registry. + */ + private void persist() { + try { + RegistryManager.getInstance().persist( + dataHolder); + } catch (RegistryException e) { + + String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; + LOG.fatal(msg); + throw new CloudControllerException(msg, e); + } + } private String generateMemberId(String clusterId) { UUID memberId = UUID.randomUUID(); - return clusterId + memberId.toString(); + return clusterId + memberId.toString(); } @Override - public void terminateInstance(String memberId) throws InvalidMemberException, InvalidCartridgeTypeException - { + public void terminateInstance(String memberId) throws InvalidMemberException, InvalidCartridgeTypeException { handleNullObject(memberId, "Termination failed. Null member id."); - + MemberContext ctxt = dataHolder.getMemberContextOfMemberId(memberId); - - if(ctxt == null) { - String msg = "Termination failed. Invalid Member Id: "+memberId; + + if (ctxt == null) { + String msg = "Termination failed. Invalid Member Id: " + memberId; LOG.error(msg); throw new InvalidMemberException(msg); } - + ThreadExecutor exec = ThreadExecutor.getInstance(); exec.execute(new InstanceTerminator(ctxt)); - } - + } + private class InstanceTerminator implements Runnable { private MemberContext ctxt; @@ -647,14 +645,14 @@ public class CloudControllerServiceImpl implements CloudControllerService { Cartridge cartridge = dataHolder.getCartridge(cartridgeType); LOG.info("Starting to terminate an instance with member id : " + memberId + - " in partition id: " + partitionId + " of cluster id: " + clusterId + - " and of cartridge type: " + cartridgeType); + " in partition id: " + partitionId + " of cluster id: " + clusterId + + " and of cartridge type: " + cartridgeType); if (cartridge == null) { String msg = - "Termination of Member Id: " + memberId + " failed. " + - "Cannot find a matching Cartridge for type: " + - cartridgeType; + "Termination of Member Id: " + memberId + " failed. " + + "Cannot find a matching Cartridge for type: " + + cartridgeType; LOG.error(msg); throw new InvalidCartridgeTypeException(msg); } @@ -663,8 +661,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { if (nodeId == null) { String msg = - "Termination failed. Cannot find a node id for Member Id: " + - memberId; + "Termination failed. Cannot find a node id for Member Id: " + + memberId; // log information logTermination(ctxt); @@ -682,7 +680,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } catch (Exception e) { String msg = - "Instance termination failed. "+ctxt.toString(); + "Instance termination failed. " + ctxt.toString(); LOG.error(msg, e); throw new CloudControllerException(msg, e); } @@ -696,8 +694,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { private IaasProvider iaasProvider; private String cartridgeType; - public JcloudsInstanceCreator(MemberContext memberContext, IaasProvider iaasProvider, - String cartridgeType) { + public JcloudsInstanceCreator(MemberContext memberContext, IaasProvider iaasProvider, + String cartridgeType) { this.memberContext = memberContext; this.iaasProvider = iaasProvider; this.cartridgeType = cartridgeType; @@ -712,379 +710,380 @@ public class CloudControllerServiceImpl implements CloudControllerService { ClusterContext ctxt = dataHolder.getClusterContext(clusterId); Iaas iaas = iaasProvider.getIaas(); String publicIp = null; - + NodeMetadata node = null; // generate the group id from domain name and sub domain name. // Should have lower-case ASCII letters, numbers, or dashes. // Should have a length between 3-15 String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length()); String group = str.replaceAll("[^a-z0-9-]", ""); - + try { - ComputeService computeService = iaasProvider - .getComputeService(); - Template template = iaasProvider.getTemplate(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is delegating request to start an instance for " - + memberContext + " to Jclouds layer."); - } - // create and start a node - Set<? extends NodeMetadata> nodes = computeService - .createNodesInGroup(group, 1, template); - node = nodes.iterator().next(); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller received a response for the request to start " - + memberContext + " from Jclouds layer."); - } - - if (node == null) { - String msg = "Null response received for instance start-up request to Jclouds.\n" + ComputeService computeService = iaasProvider + .getComputeService(); + Template template = iaasProvider.getTemplate(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to start an instance for " + + memberContext + " to Jclouds layer."); + } + // create and start a node + Set<? extends NodeMetadata> nodes = computeService + .createNodesInGroup(group, 1, template); + node = nodes.iterator().next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller received a response for the request to start " + + memberContext + " from Jclouds layer."); + } + + if (node == null) { + String msg = "Null response received for instance start-up request to Jclouds.\n" + memberContext.toString(); LOG.error(msg); throw new IllegalStateException(msg); - } - - // node id - String nodeId = node.getId(); - if (nodeId == null) { - String msg = "Node id of the starting instance is null.\n" - + memberContext.toString(); - LOG.fatal(msg); - throw new IllegalStateException(msg); - } - - memberContext.setNodeId(nodeId); - if (LOG.isDebugEnabled()) { - LOG.debug("Node id was set. " + memberContext.toString()); - } - - // attach volumes - if (ctxt.isVolumeRequired()) { - // remove region prefix - String instanceId = nodeId.indexOf('/') != -1 ? nodeId - .substring(nodeId.indexOf('/') + 1, nodeId.length()) - : nodeId; - memberContext.setInstanceId(instanceId); - if (ctxt.getVolumes() != null) { - for (Volume volume : ctxt.getVolumes()) { - try { - iaas.attachVolume(instanceId, volume.getId(), - volume.getDevice()); - } catch (Exception e) { - // continue without throwing an exception, since - // there is an instance already running - LOG.error("Attaching Volume to Instance [ " - + instanceId + " ] failed!", e); - } - } - } - } - + } + + // node id + String nodeId = node.getId(); + if (nodeId == null) { + String msg = "Node id of the starting instance is null.\n" + + memberContext.toString(); + LOG.fatal(msg); + throw new IllegalStateException(msg); + } + + memberContext.setNodeId(nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Node id was set. " + memberContext.toString()); + } + + // attach volumes + if (ctxt.isVolumeRequired()) { + // remove region prefix + String instanceId = nodeId.indexOf('/') != -1 ? nodeId + .substring(nodeId.indexOf('/') + 1, nodeId.length()) + : nodeId; + memberContext.setInstanceId(instanceId); + if (ctxt.getVolumes() != null) { + for (Volume volume : ctxt.getVolumes()) { + try { + iaas.attachVolume(instanceId, volume.getId(), + volume.getDevice()); + } catch (Exception e) { + // continue without throwing an exception, since + // there is an instance already running + LOG.error("Attaching Volume to Instance [ " + + instanceId + " ] failed!", e); + } + } + } + } + } catch (Exception e) { - String msg = "Failed to start an instance. " + memberContext.toString()+" Cause: "+e.getMessage(); - LOG.error(msg, e); - throw new IllegalStateException(msg, e); + String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); } - try{ - if (LOG.isDebugEnabled()) { - LOG.debug("IP allocation process started for "+memberContext); - } + try { + if (LOG.isDebugEnabled()) { + LOG.debug("IP allocation process started for " + memberContext); + } String autoAssignIpProp = - iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY); - + iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY); + String pre_defined_ip = iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY); - - // reset ip - String ip = ""; - - // default behavior is autoIpAssign=false - if (autoAssignIpProp == null || + + // reset ip + String ip = ""; + + // default behavior is autoIpAssign=false + if (autoAssignIpProp == null || (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) { - - // check if floating ip is well defined in cartridge definition - if (pre_defined_ip != null) { - if (isValidIpAddress(pre_defined_ip)) { - if(LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip); - } - ip = iaas.associatePredefinedAddress(node, pre_defined_ip); - - if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) { - // throw exception and stop instance creation - String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip + - " / allocated ip:" + ip + - " - terminating node:" + memberContext.toString(); - LOG.error(msg); - // terminate instance - terminate(iaasProvider, - node.getId(), memberContext); - throw new CloudControllerException(msg); - } - } else { - String msg = "Invalid floating ip address configured: " + pre_defined_ip + - " - terminating node:" + memberContext.toString(); - LOG.error(msg); - // terminate instance - terminate(iaasProvider, - node.getId(), memberContext); - throw new CloudControllerException(msg); - } - + + // check if floating ip is well defined in cartridge definition + if (pre_defined_ip != null) { + if (isValidIpAddress(pre_defined_ip)) { + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip); + } + ip = iaas.associatePredefinedAddress(node, pre_defined_ip); + + if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) { + // throw exception and stop instance creation + String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip + + " / allocated ip:" + ip + + " - terminating node:" + memberContext.toString(); + LOG.error(msg); + // terminate instance + terminate(iaasProvider, + node.getId(), memberContext); + throw new CloudControllerException(msg); + } } else { - if(LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, " - + "selecting available one from pool"); - } - // allocate an IP address - manual IP assigning mode - ip = iaas.associateAddress(node); - - if (ip != null) { - memberContext.setAllocatedIpAddress(ip); - LOG.info("Allocated an ip address: " - + memberContext.toString()); - } - } - - if (ip == null) { - String msg = "No IP address found. IP allocation failed for "+memberContext; - LOG.error(msg); + String msg = "Invalid floating ip address configured: " + pre_defined_ip + + " - terminating node:" + memberContext.toString(); + LOG.error(msg); + // terminate instance + terminate(iaasProvider, + node.getId(), memberContext); throw new CloudControllerException(msg); - } - - // build the node with the new ip - node = NodeMetadataBuilder.fromNodeMetadata(node) - .publicAddresses(ImmutableSet.of(ip)).build(); - } - - - // public ip - if (node.getPublicAddresses() != null && - node.getPublicAddresses().iterator().hasNext()) { - ip = node.getPublicAddresses().iterator().next(); - publicIp = ip; - memberContext.setPublicIpAddress(ip); - LOG.info("Retrieving Public IP Address : " + memberContext.toString()); + } + + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, " + + "selecting available one from pool"); + } + // allocate an IP address - manual IP assigning mode + ip = iaas.associateAddress(node); + + if (ip != null) { + memberContext.setAllocatedIpAddress(ip); + LOG.info("Allocated an ip address: " + + memberContext.toString()); + } } - // private IP - if (node.getPrivateAddresses() != null && - node.getPrivateAddresses().iterator().hasNext()) { - ip = node.getPrivateAddresses().iterator().next(); - memberContext.setPrivateIpAddress(ip); - LOG.info("Retrieving Private IP Address. " + memberContext.toString()); + if (ip == null) { + String msg = "No IP address found. IP allocation failed for " + memberContext; + LOG.error(msg); + throw new CloudControllerException(msg); } - dataHolder.addMemberContext(memberContext); + // build the node with the new ip + node = NodeMetadataBuilder.fromNodeMetadata(node) + .publicAddresses(ImmutableSet.of(ip)).build(); + } - // persist in registry - persist(); + // public ip + if (node.getPublicAddresses() != null && + node.getPublicAddresses().iterator().hasNext()) { + ip = node.getPublicAddresses().iterator().next(); + publicIp = ip; + memberContext.setPublicIpAddress(ip); + LOG.info("Retrieving Public IP Address : " + memberContext.toString()); + } - // trigger topology - TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, - partition.getId(), ip, publicIp, memberContext); - - String memberID = memberContext.getMemberId(); + // private IP + if (node.getPrivateAddresses() != null && + node.getPrivateAddresses().iterator().hasNext()) { + ip = node.getPrivateAddresses().iterator().next(); + memberContext.setPrivateIpAddress(ip); + LOG.info("Retrieving Private IP Address. " + memberContext.toString()); + } - // update the topology with the newly spawned member - // publish data - CartridgeInstanceDataPublisher.publish(memberID, - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - cartridgeType, - MemberStatus.Created.toString(), - node); - if (LOG.isDebugEnabled()) { - LOG.debug("Node details: " + node.toString()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("IP allocation process ended for "+memberContext); - } + dataHolder.addMemberContext(memberContext); + + // persist in registry + persist(); + + + // trigger topology + TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, + partition.getId(), ip, publicIp, memberContext); + + String memberID = memberContext.getMemberId(); + + // update the topology with the newly spawned member + // publish data + CartridgeInstanceDataPublisher.publish(memberID, + memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + cartridgeType, + MemberStatus.Created.toString(), + node); + if (LOG.isDebugEnabled()) { + LOG.debug("Node details: " + node.toString()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("IP allocation process ended for " + memberContext); + } } catch (Exception e) { String msg = "Error occurred while allocating an ip address. " + memberContext.toString(); LOG.error(msg, e); throw new CloudControllerException(msg, e); - } + } } } - - private boolean isValidIpAddress (String ip) { - boolean isValid = InetAddresses.isInetAddress(ip); - return isValid; + + private boolean isValidIpAddress(String ip) { + boolean isValid = InetAddresses.isInetAddress(ip); + return isValid; } - @Override - public void terminateAllInstances(String clusterId) throws InvalidClusterException { + @Override + public void terminateAllInstances(String clusterId) throws InvalidClusterException { + + LOG.info("Starting to terminate all instances of cluster : " + + clusterId); - LOG.info("Starting to terminate all instances of cluster : " - + clusterId); - - handleNullObject(clusterId, "Instance termination failed. Cluster id is null."); - - List<MemberContext> ctxts = dataHolder.getMemberContextsOfClusterId(clusterId); - - if(ctxts == null) { - String msg = "Instance termination failed. No members found for cluster id: "+clusterId; - LOG.warn(msg); + handleNullObject(clusterId, "Instance termination failed. Cluster id is null."); + + List<MemberContext> ctxts = dataHolder.getMemberContextsOfClusterId(clusterId); + + if (ctxts == null) { + String msg = "Instance termination failed. No members found for cluster id: " + clusterId; + LOG.warn(msg); return; - } - - ThreadExecutor exec = ThreadExecutor.getInstance(); - for (MemberContext memberContext : ctxts) { + } + + ThreadExecutor exec = ThreadExecutor.getInstance(); + for (MemberContext memberContext : ctxts) { exec.execute(new InstanceTerminator(memberContext)); } - } + } - /** - * A helper method to terminate an instance. + /** + * A helper method to terminate an instance. + * * @param iaasProvider * @param ctxt * @param nodeId * @return will return the IaaSProvider */ - private IaasProvider terminate(IaasProvider iaasProvider, - String nodeId, MemberContext ctxt) { - Iaas iaas = iaasProvider.getIaas(); - if (iaas == null) { - - try { - iaas = CloudControllerUtil.getIaas(iaasProvider); - } catch (InvalidIaasProviderException e) { - String msg = - "Instance termination failed. " +ctxt.toString() + - ". Cause: Unable to build Iaas of this " + iaasProvider.toString(); - LOG.error(msg, e); - throw new CloudControllerException(msg, e); - } - - } - - //detach volumes if any - detachVolume(iaasProvider, ctxt); - - // destroy the node - iaasProvider.getComputeService().destroyNode(nodeId); - - // release allocated IP address - if (ctxt.getAllocatedIpAddress() != null) { + private IaasProvider terminate(IaasProvider iaasProvider, + String nodeId, MemberContext ctxt) { + Iaas iaas = iaasProvider.getIaas(); + if (iaas == null) { + + try { + iaas = CloudControllerUtil.getIaas(iaasProvider); + } catch (InvalidIaasProviderException e) { + String msg = + "Instance termination failed. " + ctxt.toString() + + ". Cause: Unable to build Iaas of this " + iaasProvider.toString(); + LOG.error(msg, e); + throw new CloudControllerException(msg, e); + } + + } + + //detach volumes if any + detachVolume(iaasProvider, ctxt); + + // destroy the node + iaasProvider.getComputeService().destroyNode(nodeId); + + // release allocated IP address + if (ctxt.getAllocatedIpAddress() != null) { iaas.releaseAddress(ctxt.getAllocatedIpAddress()); - } - - LOG.info("Member is terminated: "+ctxt.toString()); - return iaasProvider; - } - - private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) { - String clusterId = ctxt.getClusterId(); - ClusterContext clusterCtxt = dataHolder.getClusterContext(clusterId); - if (clusterCtxt.getVolumes() != null) { - for (Volume volume : clusterCtxt.getVolumes()) { - try { - String volumeId = volume.getId(); - if (volumeId == null) { - return; - } - Iaas iaas = iaasProvider.getIaas(); - iaas.detachVolume(ctxt.getInstanceId(), volumeId); - } catch (ResourceNotFoundException ignore) { - if(LOG.isDebugEnabled()) { - LOG.debug(ignore); - } - } - } - } - } - - private void logTermination(MemberContext memberContext) { - - if (memberContext == null) { - return; - } - - String partitionId = memberContext.getPartition() == null ? null : memberContext.getPartition().getId(); - + } + + LOG.info("Member is terminated: " + ctxt.toString()); + return iaasProvider; + } + + private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) { + String clusterId = ctxt.getClusterId(); + ClusterContext clusterCtxt = dataHolder.getClusterContext(clusterId); + if (clusterCtxt.getVolumes() != null) { + for (Volume volume : clusterCtxt.getVolumes()) { + try { + String volumeId = volume.getId(); + if (volumeId == null) { + return; + } + Iaas iaas = iaasProvider.getIaas(); + iaas.detachVolume(ctxt.getInstanceId(), volumeId); + } catch (ResourceNotFoundException ignore) { + if (LOG.isDebugEnabled()) { + LOG.debug(ignore); + } + } + } + } + } + + private void logTermination(MemberContext memberContext) { + + if (memberContext == null) { + return; + } + + String partitionId = memberContext.getPartition() == null ? null : memberContext.getPartition().getId(); + //updating the topology - TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(), - memberContext.getClusterId(), memberContext.getNetworkPartitionId(), - partitionId, memberContext.getMemberId()); + TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(), + memberContext.getClusterId(), memberContext.getNetworkPartitionId(), + partitionId, memberContext.getMemberId()); //publishing data CartridgeInstanceDataPublisher.publish(memberContext.getMemberId(), - partitionId, - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Terminated.toString(), - null); + partitionId, + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + memberContext.getCartridgeType(), + MemberStatus.Terminated.toString(), + null); // update data holders dataHolder.removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId()); - - // persist - persist(); - - } - - @Override - public boolean registerService(Registrant registrant) - throws UnregisteredCartridgeException { - - String cartridgeType = registrant.getCartridgeType(); - handleNullObject(cartridgeType, "Service registration failed. Cartridge Type is null."); - - String clusterId = registrant.getClusterId(); - handleNullObject(clusterId, "Service registration failed. Cluster id is null."); - + + // persist + persist(); + + } + + @Override + public boolean registerService(Registrant registrant) + throws UnregisteredCartridgeException { + + String cartridgeType = registrant.getCartridgeType(); + handleNullObject(cartridgeType, "Service registration failed. Cartridge Type is null."); + + String clusterId = registrant.getClusterId(); + handleNullObject(clusterId, "Service registration failed. Cluster id is null."); + String payload = registrant.getPayload(); handleNullObject(payload, "Service registration failed. Payload is null."); - + String hostName = registrant.getHostName(); handleNullObject(hostName, "Service registration failed. Hostname is null."); - + Cartridge cartridge = null; if ((cartridge = dataHolder.getCartridge(cartridgeType)) == null) { - String msg = "Registration of cluster: "+clusterId+ + String msg = "Registration of cluster: " + clusterId + " failed. - Unregistered Cartridge type: " + cartridgeType; LOG.error(msg); throw new UnregisteredCartridgeException(msg); } - + Properties props = CloudControllerUtil.toJavaUtilProperties(registrant.getProperties()); String property = props.getProperty(Constants.IS_LOAD_BALANCER); boolean isLb = property != null ? Boolean.parseBoolean(property) : false; ClusterContext ctxt = null;//TODO buildClusterContext(cartridge, clusterId, - //payload, hostName, props, isLb, registrant.getPersistence()); + //payload, hostName, props, isLb, registrant.getPersistence()); - dataHolder.addClusterContext(ctxt); - TopologyBuilder.handleClusterCreated(registrant, isLb); - - persist(); - - LOG.info("Successfully registered: "+registrant); - - return true; - } + dataHolder.addClusterContext(ctxt); + TopologyBuilder.handleClusterCreated(registrant, isLb); - private ClusterContext buildClusterContext(Cartridge cartridge, - String clusterId, String payload, String hostName, - org.apache.stratos.common.Properties props, boolean isLb, Persistence persistence) { + persist(); - //TODO - /*// initialize ClusterContext - ClusterContext ctxt = new ClusterContext(clusterId, cartridge.getType(), payload, + LOG.info("Successfully registered: " + registrant); + + return true; + } + + private ClusterContext buildClusterContext(Cartridge cartridge, + String clusterId, String payload, String hostName, + org.apache.stratos.common.Properties props, boolean isLb, Persistence persistence) { + + //TODO + /*// initialize ClusterContext + ClusterContext ctxt = new ClusterContext(clusterId, cartridge.getType(), payload, hostName, isLb, props); String property; @@ -1105,219 +1104,218 @@ public class CloudControllerServiceImpl implements CloudControllerService { ctxt.setTimeoutInMillis(timeout); return ctxt;*/ return null; - } + } + + @Override + public String[] getRegisteredCartridges() { + // get the list of cartridges registered + List<Cartridge> cartridges = dataHolder + .getCartridges(); + + if (cartridges == null) { + LOG.info("No registered Cartridge found."); + return new String[0]; + } + + String[] cartridgeTypes = new String[cartridges.size()]; + int i = 0; + + if (LOG.isDebugEnabled()) { + LOG.debug("Registered Cartridges : \n"); + } + for (Cartridge cartridge : cartridges) { + if (LOG.isDebugEnabled()) { + LOG.debug(cartridge); + } + cartridgeTypes[i] = cartridge.getType(); + i++; + } + + return cartridgeTypes; + } @Override - public String[] getRegisteredCartridges() { - // get the list of cartridges registered - List<Cartridge> cartridges = dataHolder - .getCartridges(); - - if (cartridges == null) { - LOG.info("No registered Cartridge found."); - return new String[0]; - } - - String[] cartridgeTypes = new String[cartridges.size()]; - int i = 0; - - if (LOG.isDebugEnabled()) { - LOG.debug("Registered Cartridges : \n"); - } - for (Cartridge cartridge : cartridges) { - if (LOG.isDebugEnabled()) { - LOG.debug(cartridge); - } - cartridgeTypes[i] = cartridge.getType(); - i++; - } - - return cartridgeTypes; - } - - @Override - public CartridgeInfo getCartridgeInfo(String cartridgeType) - throws UnregisteredCartridgeException { - Cartridge cartridge = dataHolder - .getCartridge(cartridgeType); - - if (cartridge != null) { - - return CloudControllerUtil.toCartridgeInfo(cartridge); - - } - - String msg = "Cannot find a Cartridge having a type of " - + cartridgeType + ". Hence unable to find information."; - LOG.error(msg); - throw new UnregisteredCartridgeException(msg); - } + public CartridgeInfo getCartridgeInfo(String cartridgeType) + throws UnregisteredCartridgeException { + Cartridge cartridge = dataHolder + .getCartridge(cartridgeType); + + if (cartridge != null) { + + return CloudControllerUtil.toCartridgeInfo(cartridge); + + } + + String msg = "Cannot find a Cartridge having a type of " + + cartridgeType + ". Hence unable to find information."; + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } @Override - public void unregisterService(String clusterId) throws UnregisteredClusterException { + public void unregisterService(String clusterId) throws UnregisteredClusterException { final String clusterId_ = clusterId; - + ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); - handleNullObject(ctxt, "Service unregistration failed. Invalid cluster id: "+clusterId); - + handleNullObject(ctxt, "Service unregistration failed. Invalid cluster id: " + clusterId); + String cartridgeType = ctxt.getCartridgeType(); Cartridge cartridge = dataHolder.getCartridge(cartridgeType); if (cartridge == null) { String msg = - "Service unregistration failed. No matching Cartridge found [type] "+cartridgeType +". "; + "Service unregistration failed. No matching Cartridge found [type] " + cartridgeType + ". "; LOG.error(msg); throw new UnregisteredClusterException(msg); } - + // if it's a kubernetes cluster if (StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) { - unregisterDockerService(clusterId_); - + unregisterDockerService(clusterId_); + } else { - + // TopologyBuilder.handleClusterMaintenanceMode(dataHolder.getClusterContext(clusterId_)); - - Runnable terminateInTimeout = new Runnable() { - @Override - public void run() { - ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); - if(ctxt == null) { - String msg = "Service unregistration failed. Cluster not found: " + clusterId_; - LOG.error(msg); - return; - } - Collection<Member> members = TopologyManager.getTopology(). - getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); - //finding the responding members from the existing members in the topology. - int sizeOfRespondingMembers = 0; - for(Member member : members) { - if(member.getStatus().getCode() >= MemberStatus.Activated.getCode()) { - sizeOfRespondingMembers ++; - } - } - - long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * sizeOfRespondingMembers; - while(System.currentTimeMillis()< endTime) { - CloudControllerUtil.sleep(1000); - - } - - // if there're still alive members - if(members.size() > 0) { - //forcefully terminate them - for (Member member : members) { - - try { - terminateInstance(member.getMemberId()); - } catch (Exception e) { - // we are not gonna stop the execution due to errors. - LOG.warn("Instance termination failed of member [id] " + member.getMemberId(), e); - } - } - } - } - }; - Runnable unregister = new Runnable() { - public void run() { - ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); - if(ctxt == null) { - String msg = "Service unregistration failed. Cluster not found: " + clusterId_; - LOG.error(msg); - return; - } - Collection<Member> members = TopologyManager.getTopology(). - getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); - // TODO why end time is needed? - // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size(); - - while(members.size() > 0) { - //waiting until all the members got removed from the Topology/ timed out - CloudControllerUtil.sleep(1000); - } - - LOG.info("Unregistration of service cluster: " + clusterId_); - deleteVolumes(ctxt); - onClusterRemoval(clusterId_); - } - - private void deleteVolumes(ClusterContext ctxt) { - if(ctxt.isVolumeRequired()) { - Cartridge cartridge = dataHolder.getCartridge(ctxt.getCartridgeType()); - if(cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) { - for (Volume volume : ctxt.getVolumes()) { - if(volume.getId() != null) { - String iaasType = volume.getIaasType(); - //Iaas iaas = dataHolder.getIaasProvider(iaasType).getIaas(); - Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas(); - if(iaas != null) { - try { - // delete the volumes if remove on unsubscription is true. - if(volume.isRemoveOntermination()) - { - iaas.deleteVolume(volume.getId()); - volume.setId(null); - } - } catch(Exception ignore) { - if(LOG.isErrorEnabled()) { - LOG.error("Error while deleting volume [id] "+ volume.getId(), ignore); - } - } - } - } - } - - } - } - } - }; - new Thread(terminateInTimeout).start(); - new Thread(unregister).start(); - } - } - + + Runnable terminateInTimeout = new Runnable() { + @Override + public void run() { + ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + if (ctxt == null) { + String msg = "Service unregistration failed. Cluster not found: " + clusterId_; + LOG.error(msg); + return; + } + Collection<Member> members = TopologyManager.getTopology(). + getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); + //finding the responding members from the existing members in the topology. + int sizeOfRespondingMembers = 0; + for (Member member : members) { + if (member.getStatus().getCode() >= MemberStatus.Activated.getCode()) { + sizeOfRespondingMembers++; + } + } + + long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * sizeOfRespondingMembers; + while (System.currentTimeMillis() < endTime) { + CloudControllerUtil.sleep(1000); + + } + + // if there're still alive members + if (members.size() > 0) { + //forcefully terminate them + for (Member member : members) { + + try { + terminateInstance(member.getMemberId()); + } catch (Exception e) { + // we are not gonna stop the execution due to errors. + LOG.warn("Instance termination failed of member [id] " + member.getMemberId(), e); + } + } + } + } + }; + Runnable unregister = new Runnable() { + public void run() { + ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + if (ctxt == null) { + String msg = "Service unregistration failed. Cluster not found: " + clusterId_; + LOG.error(msg); + return; + } + Collection<Member> members = TopologyManager.getTopology(). + getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); + // TODO why end time is needed? + // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size(); + + while (members.size() > 0) { + //waiting until all the members got removed from the Topology/ timed out + CloudControllerUtil.sleep(1000); + } + + LOG.info("Unregistration of service cluster: " + clusterId_); + deleteVolumes(ctxt); + onClusterRemoval(clusterId_); + } + + private void deleteVolumes(ClusterContext ctxt) { + if (ctxt.isVolumeRequired()) { + Cartridge cartridge = dataHolder.getCartridge(ctxt.getCartridgeType()); + if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) { + for (Volume volume : ctxt.getVolumes()) { + if (volume.getId() != null) { + String iaasType = volume.getIaasType(); + //Iaas iaas = dataHolder.getIaasProvider(iaasType).getIaas(); + Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas(); + if (iaas != null) { + try { + // delete the volumes if remove on unsubscription is true. + if (volume.isRemoveOntermination()) { + iaas.deleteVolume(volume.getId()); + volume.setId(null); + } + } catch (Exception ignore) { + if (LOG.isErrorEnabled()) { + LOG.error("Error while deleting volume [id] " + volume.getId(), ignore); + } + } + } + } + } + + } + } + } + }; + new Thread(terminateInTimeout).start(); + new Thread(unregister).start(); + } + } + @Override - public void unregisterDockerService(String clusterId) - throws UnregisteredClusterException { - - // terminate all kubernetes units - try { - terminateAllContainers(clusterId); - } catch (InvalidClusterException e) { - String msg = "Docker instance termination fails for cluster: "+clusterId; - LOG.error(msg, e); - throw new UnregisteredClusterException(msg, e); - } - // send cluster removal notifications and update the state - onClusterRemoval(clusterId); - } + public void unregisterDockerService(String clusterId) + throws UnregisteredClusterException { + + // terminate all kubernetes units + try { + terminateAllContainers(clusterId); + } catch (InvalidClusterException e) { + String msg = "Docker instance termination fails for cluster: " + clusterId; + LOG.error(msg, e); + throw new UnregisteredClusterException(msg, e); + } + // send cluster removal notifications and update the state + onClusterRemoval(clusterId); + } @Override - public boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions) + public boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions) throws InvalidPartitionException, InvalidCartridgeTypeException { - Map<String, List<String>> validatedCache = dataHolder.getCartridgeTypeToPartitionIds(); - List<String> validatedPartitions = new ArrayList<String>(); - - if (validatedCache.containsKey(cartridgeType)) { - // cache hit for this cartridge - // get list of partitions - validatedPartitions = validatedCache.get(cartridgeType); - if (LOG.isDebugEnabled()) { - LOG.debug("Partition validation cache hit for cartridge type: "+cartridgeType); - } - - } - + Map<String, List<String>> validatedCache = dataHolder.getCartridgeTypeToPartitionIds(); + List<String> validatedPartitions = new ArrayList<String>(); + + if (validatedCache.containsKey(cartridgeType)) { + // cache hit for this cartridge + // get list of partitions + validatedPartitions = validatedCache.get(cartridgeType); + if (LOG.isDebugEnabled()) { + LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType); + } + + } + Map<String, IaasProvider> partitionToIaasProviders = - new ConcurrentHashMap<String, IaasProvider>(); - + new ConcurrentHashMap<String, IaasProvider>(); + if (LOG.isDebugEnabled()) { - LOG.debug("Deployment policy validation started for cartridge type: "+cartridgeType); - } + LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType); + } Cartridge cartridge = dataHolder.getCartridge(cartridgeType); @@ -1326,138 +1324,138 @@ public class CloudControllerServiceImpl implements CloudControllerService { LOG.error(msg); throw new InvalidCartridgeTypeException(msg); } - + Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>(); - for (Partition partition : partitions) { - - if (validatedPartitions.contains(partition.getId())) { - // partition cache hit - continue; - } - - Callable<IaasProvider> worker = new PartitionValidatorCallable( - partition, cartridge); - Future<IaasProvider> job = FasterLookUpDataHolder.getInstance() - .getExecutor().submit(worker); - jobList.put(partition.getId(), job); - } - + for (Partition partition : partitions) { + + if (validatedPartitions.contains(partition.getId())) { + // partition cache hit + continue; + } + + Callable<IaasProvider> worker = new PartitionValidatorCallable( + partition, cartridge); + Future<IaasProvider> job = FasterLookUpDataHolder.getInstance() + .getExecutor().submit(worker); + jobList.put(partition.getId(), job); + } + // Retrieve the results of the concurrently performed sanity checks. for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) { if (entry == null) { continue; } String partitionId = entry.getKey(); - Future<IaasProvider> job = entry.getValue(); + Future<IaasProvider> job = entry.getValue(); try { - // add to a temporary Map - partitionToIaasProviders.put(partitionId, job.get()); - - // add to cache - this.dataHolder.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId); - - if (LOG.isDebugEnabled()) { - LOG.debug("Partition "+partitionId+" added to the cache against cartridge type: "+cartridgeType); - } + // add to a temporary Map + partitionToIaasProviders.put(partitionId, job.get()); + + // add to cache + this.dataHolder.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId); + + if (LOG.isDebugEnabled()) { + LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType); + } } catch (Exception e) { LOG.error(e.getMessage(), e); throw new InvalidPartitionException(e.getMessage(), e); - } + } } // if and only if the deployment policy valid cartridge.addIaasProviders(partitionToIaasProviders); - + // persist data persist(); - - LOG.info("All partitions "+CloudControllerUtil.getPartitionIds(partitions)+ - " were validated successfully, against the Cartridge: "+cartridgeType); - + + LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) + + " were validated successfully, against the Cartridge: " + cartridgeType); + return true; } - + private void onClusterRemoval(final String clusterId) { - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); - TopologyBuilder.handleClusterRemoved(ctxt); - dataHolder.removeClusterContext(clusterId); - dataHolder.removeMemberContextsOfCluster(clusterId); - persist(); - } + ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + TopologyBuilder.handleClusterRemoved(ctxt); + dataHolder.removeClusterContext(clusterId); + dataHolder.removeMemberContextsOfCluster(clusterId); + persist(); + } @Override public boolean validatePartition(Partition partition) throws InvalidPartitionException { handleNullObject(partition, "Partition validation failed. Partition is null."); String provider = partition.getProvider(); - handleNullObject(provider, "Partition ["+partition.getId()+"] validation failed. Partition provider is null."); + handleNullObject(provider, "Partition [" + partition.getId() + "] validation failed. Partition provider is null."); IaasProvider iaasProvider = dataHolder.getIaasProvider(provider); if (iaasProvider == null) { String msg = - "Invalid Partition - " + partition.toString()+". Cause: Iaas Provider " + - "is null for Partition Provider: "+provider; + "Invalid Partition - " + partition.toString() + ". Cause: Iaas Provider " + + "is null for Partition Provider: " + provider; LOG.error(msg); throw new InvalidPartitionException(msg); } - + Iaas iaas = iaasProvider.getIaas(); - + if (iaas == null) { - -
<TRUNCATED>
