Repository: stratos Updated Branches: refs/heads/master 08de72982 -> 71fab2b44
http://git-wip-us.apache.org/repos/asf/stratos/blob/71fab2b4/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index 548b743..f9c55a0 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -103,34 +103,24 @@ public class CloudControllerServiceImpl implements CloudControllerService { // cartridge can never be null cartridge = CloudControllerUtil.toCartridge(cartridgeConfig); } catch (Exception e) { - String msg = - "Invalid Cartridge Definition: Cartridge Type: " + - cartridgeConfig.getType() + - ". Cause: Cannot instantiate a Cartridge Instance with the given Config. " + e.getMessage(); + String msg = "Invalid cartridge definition: Cartridge type: " + cartridgeConfig.getType() + + " Cause: Cannot instantiate a cartridge instance with the given configuration: " + e.getMessage(); LOG.error(msg, e); throw new InvalidCartridgeDefinitionException(msg, e); } - List<IaasProvider> iaases = cartridge.getIaases(); + List<IaasProvider> iaasProviders = cartridge.getIaases(); if (!StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) { - if (iaases == null || iaases.isEmpty()) { - String msg = "Invalid Cartridge Definition: Cartridge Type: " - + cartridgeConfig.getType() - + ". Cause: Iaases of this Cartridge is null or empty."; - LOG.error(msg); - throw new InvalidCartridgeDefinitionException(msg); - } - - if (iaases == null || iaases.isEmpty()) { - String msg = "Invalid Cartridge Definition: Cartridge Type: " + + if (iaasProviders == null || iaasProviders.isEmpty()) { + String msg = "Invalid cartridge definition: Cartridge type: " + cartridgeConfig.getType() + - ". Cause: Iaases of this Cartridge is null or empty."; + " Cause: Iaases of this cartridge is null or empty"; LOG.error(msg); throw new InvalidCartridgeDefinitionException(msg); } - for (IaasProvider iaasProvider : iaases) { + for (IaasProvider iaasProvider : iaasProviders) { CloudControllerUtil.getIaas(iaasProvider); } } @@ -148,7 +138,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { populateNewCartridge(cartridge, cartridgeToBeRemoved); } - cloudControllerContext.addCartridge(cartridge); + CloudControllerContext.getInstance().addCartridge(cartridge); // persist persist(); @@ -188,10 +178,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException { Cartridge cartridge = null; - if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) != null) { - if (cloudControllerContext.getCartridges().remove(cartridge)) { + if ((cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType)) != null) { + if (CloudControllerContext.getInstance().getCartridges().remove(cartridge)) { // invalidate partition validation cache - cloudControllerContext.removeFromCartridgeTypeToPartitionIds(cartridgeType); + CloudControllerContext.getInstance().removeFromCartridgeTypeToPartitionIds(cartridgeType); if (LOG.isDebugEnabled()) { LOG.debug("Partition cache invalidated for cartridge " + cartridgeType); @@ -261,7 +251,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } } - cloudControllerContext.addServiceGroup(servicegroup); + CloudControllerContext.getInstance().addServiceGroup(servicegroup); this.persist(); @@ -274,10 +264,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { ServiceGroup serviceGroup = null; - serviceGroup = cloudControllerContext.getServiceGroup(name); + serviceGroup = CloudControllerContext.getInstance().getServiceGroup(name); if (serviceGroup != null) { - if (cloudControllerContext.getServiceGroups().remove(serviceGroup)) { + if (CloudControllerContext.getInstance().getServiceGroups().remove(serviceGroup)) { persist(); if (LOG.isInfoEnabled()) { LOG.info("Successfully undeployed the Service Group definition: " + serviceGroup); @@ -299,7 +289,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { LOG.debug("getServiceGroupDefinition:" + name); } - ServiceGroup serviceGroup = this.cloudControllerContext.getServiceGroup(name); + ServiceGroup serviceGroup = CloudControllerContext.getInstance().getServiceGroup(name); if (serviceGroup == null) { if (LOG.isDebugEnabled()) { @@ -365,13 +355,13 @@ public class CloudControllerServiceImpl implements CloudControllerService { memberContext); String partitionId = partition.getId(); - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId); handleNullObject(ctxt, "Instance start-up failed. Invalid cluster id. " + memberContext); String cartridgeType = ctxt.getCartridgeType(); - Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); if (cartridge == null) { String msg = @@ -552,7 +542,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { */ private void persist() { try { - cloudControllerContext.persist(); + CloudControllerContext.getInstance().persist(); } catch (RegistryException e) { String msg = "Failed to persist the cloud controller context in registry."; LOG.fatal(msg); @@ -570,7 +560,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(memberId, "Termination failed. Null member id."); - MemberContext ctxt = cloudControllerContext.getMemberContextOfMemberId(memberId); + MemberContext ctxt = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); if (ctxt == null) { String msg = "Termination failed. Invalid Member Id: " + memberId; @@ -717,24 +707,23 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public void run() { - String memberId = ctxt.getMemberId(); String clusterId = ctxt.getClusterId(); String partitionId = ctxt.getPartition().getId(); String cartridgeType = ctxt.getCartridgeType(); String nodeId = ctxt.getNodeId(); + Lock lock = null; try { - // these will never be null, since we do not add null values for these. - Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); + CloudControllerContext.getInstance().acquireMemberContextWriteLock(); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); LOG.info("Starting to terminate an instance with member id : " + memberId + " in partition id: " + partitionId + " of cluster id: " + clusterId + " and of cartridge type: " + cartridgeType); if (cartridge == null) { - String msg = - "Termination of Member Id: " + memberId + " failed. " + + String msg = "Termination of Member Id: " + memberId + " failed. " + "Cannot find a matching Cartridge for type: " + cartridgeType; LOG.error(msg); @@ -743,10 +732,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { // if no matching node id can be found. if (nodeId == null) { - - String msg = - "Termination failed. Cannot find a node id for Member Id: " + - memberId; + String msg = "Termination failed. Cannot find a node id for Member Id: " + memberId; // log information logTermination(ctxt); @@ -761,14 +747,15 @@ public class CloudControllerServiceImpl implements CloudControllerService { // log information logTermination(ctxt); - } catch (Exception e) { - String msg = - "Instance termination failed. " + ctxt.toString(); + String msg = "Instance termination failed. " + ctxt.toString(); LOG.error(msg, e); throw new CloudControllerException(msg, e); + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); + } } - } } @@ -787,117 +774,127 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public void run() { + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); + String clusterId = memberContext.getClusterId(); + Partition partition = memberContext.getPartition(); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId); + Iaas iaas = iaasProvider.getIaas(); + String publicIp = null; - String clusterId = memberContext.getClusterId(); - Partition partition = memberContext.getPartition(); - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); - Iaas iaas = iaasProvider.getIaas(); - String publicIp = null; - - NodeMetadata node = null; - // generate the group id from domain name and sub domain name. - // Should have lower-case ASCII letters, numbers, or dashes. - // Should have a length between 3-15 - String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length()); - String group = str.replaceAll("[^a-z0-9-]", ""); + NodeMetadata node = null; + // generate the group id from domain name and sub domain name. + // Should have lower-case ASCII letters, numbers, or dashes. + // Should have a length between 3-15 + String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length()); + String group = str.replaceAll("[^a-z0-9-]", ""); - try { - ComputeService computeService = iaasProvider - .getComputeService(); - Template template = iaasProvider.getTemplate(); + try { + ComputeService computeService = iaasProvider.getComputeService(); + Template template = iaasProvider.getTemplate(); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is delegating request to start an instance for " - + memberContext + " to Jclouds layer."); - } - // create and start a node - Set<? extends NodeMetadata> nodes = computeService - .createNodesInGroup(group, 1, template); - node = nodes.iterator().next(); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller received a response for the request to start " - + memberContext + " from Jclouds layer."); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to start an instance for " + + memberContext + " to Jclouds layer."); + } + // create and start a node + Set<? extends NodeMetadata> nodes = computeService + .createNodesInGroup(group, 1, template); + node = nodes.iterator().next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller received a response for the request to start " + + memberContext + " from Jclouds layer."); + } - if (node == null) { - String msg = "Null response received for instance start-up request to Jclouds.\n" - + memberContext.toString(); - LOG.error(msg); - throw new IllegalStateException(msg); - } + if (node == null) { + String msg = "Null response received for instance start-up request to Jclouds.\n" + + memberContext.toString(); + LOG.error(msg); + throw new IllegalStateException(msg); + } - // node id - String nodeId = node.getId(); - if (nodeId == null) { - String msg = "Node id of the starting instance is null.\n" - + memberContext.toString(); - LOG.fatal(msg); - throw new IllegalStateException(msg); - } + // node id + String nodeId = node.getId(); + if (nodeId == null) { + String msg = "Node id of the starting instance is null.\n" + + memberContext.toString(); + LOG.fatal(msg); + throw new IllegalStateException(msg); + } - memberContext.setNodeId(nodeId); - if (LOG.isDebugEnabled()) { - LOG.debug("Node id was set. " + memberContext.toString()); - } + memberContext.setNodeId(nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Node id was set. " + memberContext.toString()); + } - // attach volumes - if (ctxt.isVolumeRequired()) { - // remove region prefix - String instanceId = nodeId.indexOf('/') != -1 ? nodeId - .substring(nodeId.indexOf('/') + 1, nodeId.length()) - : nodeId; - memberContext.setInstanceId(instanceId); - if (ctxt.getVolumes() != null) { - for (Volume volume : ctxt.getVolumes()) { - try { - iaas.attachVolume(instanceId, volume.getId(), - volume.getDevice()); - } catch (Exception e) { - // continue without throwing an exception, since - // there is an instance already running - LOG.error("Attaching Volume to Instance [ " - + instanceId + " ] failed!", e); + // attach volumes + if (ctxt.isVolumeRequired()) { + // remove region prefix + String instanceId = nodeId.indexOf('/') != -1 ? nodeId + .substring(nodeId.indexOf('/') + 1, nodeId.length()) + : nodeId; + memberContext.setInstanceId(instanceId); + if (ctxt.getVolumes() != null) { + for (Volume volume : ctxt.getVolumes()) { + try { + iaas.attachVolume(instanceId, volume.getId(), + volume.getDevice()); + } catch (Exception e) { + // continue without throwing an exception, since + // there is an instance already running + LOG.error("Attaching Volume to Instance [ " + + instanceId + " ] failed!", e); + } } } } - } - - } catch (Exception e) { - String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage(); - LOG.error(msg, e); - throw new IllegalStateException(msg, e); - } - try { - if (LOG.isDebugEnabled()) { - LOG.debug("IP allocation process started for " + memberContext); + } catch (Exception e) { + String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); } - String autoAssignIpProp = - iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY); - String pre_defined_ip = - iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("IP allocation process started for " + memberContext); + } + String autoAssignIpProp = + iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY); - // reset ip - String ip = ""; + String pre_defined_ip = + iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY); - // default behavior is autoIpAssign=false - if (autoAssignIpProp == null || - (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) { + // reset ip + String ip = ""; - // check if floating ip is well defined in cartridge definition - if (pre_defined_ip != null) { - if (isValidIpAddress(pre_defined_ip)) { - if (LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip); - } - ip = iaas.associatePredefinedAddress(node, pre_defined_ip); + // default behavior is autoIpAssign=false + if (autoAssignIpProp == null || + (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) { - if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) { - // throw exception and stop instance creation - String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip + - " / allocated ip:" + ip + + // check if floating ip is well defined in cartridge definition + if (pre_defined_ip != null) { + if (isValidIpAddress(pre_defined_ip)) { + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip); + } + ip = iaas.associatePredefinedAddress(node, pre_defined_ip); + + if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) { + // throw exception and stop instance creation + String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip + + " / allocated ip:" + ip + + " - terminating node:" + memberContext.toString(); + LOG.error(msg); + // terminate instance + terminate(iaasProvider, + node.getId(), memberContext); + throw new CloudControllerException(msg); + } + } else { + String msg = "Invalid floating ip address configured: " + pre_defined_ip + " - terminating node:" + memberContext.toString(); LOG.error(msg); // terminate instance @@ -905,111 +902,105 @@ public class CloudControllerServiceImpl implements CloudControllerService { node.getId(), memberContext); throw new CloudControllerException(msg); } + } else { - String msg = "Invalid floating ip address configured: " + pre_defined_ip + - " - terminating node:" + memberContext.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, " + + "selecting available one from pool"); + } + // allocate an IP address - manual IP assigning mode + ip = iaas.associateAddress(node); + + if (ip != null) { + memberContext.setAllocatedIpAddress(ip); + if (LOG.isDebugEnabled()) { + LOG.debug("Allocated an ip address: " + + memberContext.toString()); + } else if (LOG.isInfoEnabled()) { + LOG.info("Allocated ip address [ " + memberContext.getAllocatedIpAddress() + + " ] to member with id: " + memberContext.getMemberId()); + } + } + } + + if (ip == null) { + String msg = "No IP address found. IP allocation failed for " + memberContext; LOG.error(msg); - // terminate instance - terminate(iaasProvider, - node.getId(), memberContext); throw new CloudControllerException(msg); } - } else { + // build the node with the new ip + node = NodeMetadataBuilder.fromNodeMetadata(node) + .publicAddresses(ImmutableSet.of(ip)).build(); + } + + + // public ip + if (node.getPublicAddresses() != null && + node.getPublicAddresses().iterator().hasNext()) { + ip = node.getPublicAddresses().iterator().next(); + publicIp = ip; + memberContext.setPublicIpAddress(ip); if (LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, " - + "selecting available one from pool"); + LOG.debug("Retrieving Public IP Address : " + memberContext.toString()); + } else if (LOG.isInfoEnabled()) { + LOG.info("Retrieving Public IP Address: " + memberContext.getPublicIpAddress() + + ", member id: " + memberContext.getMemberId()); } - // allocate an IP address - manual IP assigning mode - ip = iaas.associateAddress(node); + } - if (ip != null) { - memberContext.setAllocatedIpAddress(ip); - if (LOG.isDebugEnabled()) { - LOG.debug("Allocated an ip address: " - + memberContext.toString()); - } else if (LOG.isInfoEnabled()) { - LOG.info("Allocated ip address [ " + memberContext.getAllocatedIpAddress() + - " ] to member with id: " + memberContext.getMemberId()); - } + // private IP + if (node.getPrivateAddresses() != null && + node.getPrivateAddresses().iterator().hasNext()) { + ip = node.getPrivateAddresses().iterator().next(); + memberContext.setPrivateIpAddress(ip); + if (LOG.isDebugEnabled()) { + LOG.debug("Retrieving Private IP Address. " + memberContext.toString()); + } else if (LOG.isInfoEnabled()) { + LOG.info("Retrieving Private IP Address: " + memberContext.getPrivateIpAddress() + + ", member id: " + memberContext.getMemberId()); } } - if (ip == null) { - String msg = "No IP address found. IP allocation failed for " + memberContext; - LOG.error(msg); - throw new CloudControllerException(msg); - } + CloudControllerContext.getInstance().addMemberContext(memberContext); - // build the node with the new ip - node = NodeMetadataBuilder.fromNodeMetadata(node) - .publicAddresses(ImmutableSet.of(ip)).build(); - } + // persist in registry + persist(); + + + // trigger topology + TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, + partition.getId(), ip, publicIp, memberContext); + String memberID = memberContext.getMemberId(); - // public ip - if (node.getPublicAddresses() != null && - node.getPublicAddresses().iterator().hasNext()) { - ip = node.getPublicAddresses().iterator().next(); - publicIp = ip; - memberContext.setPublicIpAddress(ip); + // update the topology with the newly spawned member + // publish data + CartridgeInstanceDataPublisher.publish(memberID, + memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + cartridgeType, + MemberStatus.Created.toString(), + node); if (LOG.isDebugEnabled()) { - LOG.debug("Retrieving Public IP Address : " + memberContext.toString()); - } else if (LOG.isInfoEnabled()) { - LOG.info("Retrieving Public IP Address: " + memberContext.getPublicIpAddress() + - ", member id: " + memberContext.getMemberId()); + LOG.debug("Node details: " + node.toString()); } - } - // private IP - if (node.getPrivateAddresses() != null && - node.getPrivateAddresses().iterator().hasNext()) { - ip = node.getPrivateAddresses().iterator().next(); - memberContext.setPrivateIpAddress(ip); if (LOG.isDebugEnabled()) { - LOG.debug("Retrieving Private IP Address. " + memberContext.toString()); - } else if (LOG.isInfoEnabled()) { - LOG.info("Retrieving Private IP Address: " + memberContext.getPrivateIpAddress() + - ", member id: " + memberContext.getMemberId()); + LOG.debug("IP allocation process ended for " + memberContext); } - } - - cloudControllerContext.addMemberContext(memberContext); - - // persist in registry - persist(); - - - // trigger topology - TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, - partition.getId(), ip, publicIp, memberContext); - String memberID = memberContext.getMemberId(); - - // update the topology with the newly spawned member - // publish data - CartridgeInstanceDataPublisher.publish(memberID, - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - cartridgeType, - MemberStatus.Created.toString(), - node); - if (LOG.isDebugEnabled()) { - LOG.debug("Node details: " + node.toString()); + } catch (Exception e) { + String msg = "Error occurred while allocating an ip address. " + memberContext.toString(); + LOG.error(msg, e); + throw new CloudControllerException(msg, e); } - - if (LOG.isDebugEnabled()) { - LOG.debug("IP allocation process ended for " + memberContext); + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); } - - } catch (Exception e) { - String msg = "Error occurred while allocating an ip address. " + memberContext.toString(); - LOG.error(msg, e); - throw new CloudControllerException(msg, e); } - - } } @@ -1026,7 +1017,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(clusterId, "Instance termination failed. Cluster id is null."); - List<MemberContext> ctxts = cloudControllerContext.getMemberContextsOfClusterId(clusterId); + List<MemberContext> ctxts = CloudControllerContext.getInstance().getMemberContextsOfClusterId(clusterId); if (ctxts == null) { String msg = "Instance termination failed. No members found for cluster id: " + clusterId; @@ -1088,7 +1079,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) { String clusterId = ctxt.getClusterId(); - ClusterContext clusterCtxt = cloudControllerContext.getClusterContext(clusterId); + ClusterContext clusterCtxt = CloudControllerContext.getInstance().getClusterContext(clusterId); if (clusterCtxt.getVolumes() != null) { for (Volume volume : clusterCtxt.getVolumes()) { try { @@ -1130,11 +1121,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { null); // update data holders - cloudControllerContext.removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId()); + CloudControllerContext.getInstance().removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId()); // persist persist(); - } @Override @@ -1154,7 +1144,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(hostName, "Service registration failed. Hostname is null."); Cartridge cartridge = null; - if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) == null) { + if ((cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType)) == null) { String msg = "Registration of cluster: " + clusterId + " failed. - Unregistered Cartridge type: " + cartridgeType; @@ -1171,7 +1161,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { payload, hostName, props, isLb, registrant.getPersistence()); - cloudControllerContext.addClusterContext(ctxt);*/ + CloudControllerContext.getInstance().addClusterContext(ctxt);*/ TopologyBuilder.handleClusterCreated(registrant, isLb); persist(); @@ -1213,8 +1203,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public String[] getRegisteredCartridges() { // get the list of cartridges registered - List<Cartridge> cartridges = cloudControllerContext - .getCartridges(); + Collection<Cartridge> cartridges = CloudControllerContext.getInstance().getCartridges(); if (cartridges == null) { LOG.info("No registered Cartridge found."); @@ -1241,7 +1230,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public CartridgeInfo getCartridgeInfo(String cartridgeType) throws UnregisteredCartridgeException { - Cartridge cartridge = cloudControllerContext + Cartridge cartridge = CloudControllerContext.getInstance() .getCartridge(cartridgeType); if (cartridge != null) { @@ -1260,13 +1249,13 @@ public class CloudControllerServiceImpl implements CloudControllerService { public void unregisterService(String clusterId) throws UnregisteredClusterException { final String clusterId_ = clusterId; - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_); handleNullObject(ctxt, "Service unregistration failed. Invalid cluster id: " + clusterId); String cartridgeType = ctxt.getCartridgeType(); - Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); if (cartridge == null) { String msg = @@ -1281,12 +1270,12 @@ public class CloudControllerServiceImpl implements CloudControllerService { } else { -// TopologyBuilder.handleClusterMaintenanceMode(cloudControllerContext.getClusterContext(clusterId_)); +// TopologyBuilder.handleClusterMaintenanceMode(CloudControllerContext.getInstance().getClusterContext(clusterId_)); Runnable terminateInTimeout = new Runnable() { @Override public void run() { - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_); if (ctxt == null) { String msg = "Service unregistration failed. Cluster not found: " + clusterId_; LOG.error(msg); @@ -1325,52 +1314,69 @@ public class CloudControllerServiceImpl implements CloudControllerService { }; Runnable unregister = new Runnable() { public void run() { - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_); - if (ctxt == null) { - String msg = "Service unregistration failed. Cluster not found: " + clusterId_; - LOG.error(msg); - return; - } - Collection<Member> members = TopologyManager.getTopology(). - getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); - // TODO why end time is needed? - // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size(); + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireClusterContextWriteLock(); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_); + if (ctxt == null) { + String msg = "Service unregistration failed. Cluster not found: " + clusterId_; + LOG.error(msg); + return; + } + Collection<Member> members = TopologyManager.getTopology(). + getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); + // TODO why end time is needed? + // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size(); + + while (members.size() > 0) { + //waiting until all the members got removed from the Topology/ timed out + CloudControllerUtil.sleep(1000); + } - while (members.size() > 0) { - //waiting until all the members got removed from the Topology/ timed out - CloudControllerUtil.sleep(1000); + LOG.info("Unregistration of service cluster: " + clusterId_); + deleteVolumes(ctxt); + onClusterRemoval(clusterId_); + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); + } } - - LOG.info("Unregistration of service cluster: " + clusterId_); - deleteVolumes(ctxt); - onClusterRemoval(clusterId_); } private void deleteVolumes(ClusterContext ctxt) { if (ctxt.isVolumeRequired()) { - Cartridge cartridge = cloudControllerContext.getCartridge(ctxt.getCartridgeType()); - if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) { - for (Volume volume : ctxt.getVolumes()) { - if (volume.getId() != null) { - String iaasType = volume.getIaasType(); - //Iaas iaas = cloudControllerContext.getIaasProvider(iaasType).getIaas(); - Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas(); - if (iaas != null) { - try { - // delete the volumes if remove on unsubscription is true. - if (volume.isRemoveOntermination()) { - iaas.deleteVolume(volume.getId()); - volume.setId(null); - } - } catch (Exception ignore) { - if (LOG.isErrorEnabled()) { - LOG.error("Error while deleting volume [id] " + volume.getId(), ignore); + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireCartridgesWriteLock(); + + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(ctxt.getCartridgeType()); + if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) { + for (Volume volume : ctxt.getVolumes()) { + if (volume.getId() != null) { + String iaasType = volume.getIaasType(); + //Iaas iaas = CloudControllerContext.getInstance().getIaasProvider(iaasType).getIaas(); + Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas(); + if (iaas != null) { + try { + // delete the volumes if remove on unsubscription is true. + if (volume.isRemoveOntermination()) { + iaas.deleteVolume(volume.getId()); + volume.setId(null); + } + } catch (Exception ignore) { + if (LOG.isErrorEnabled()) { + LOG.error("Error while deleting volume [id] " + volume.getId(), ignore); + } } } } } + CloudControllerContext.getInstance().updateCartridge(cartridge); + } + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); } - } } } @@ -1383,104 +1389,120 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public void unregisterDockerService(String clusterId) throws UnregisteredClusterException { - - // terminate all kubernetes units + Lock lock = null; try { - terminateAllContainers(clusterId); - } catch (InvalidClusterException e) { - String msg = "Docker instance termination fails for cluster: " + clusterId; - LOG.error(msg, e); - throw new UnregisteredClusterException(msg, e); + lock = CloudControllerContext.getInstance().acquireClusterContextWriteLock(); + // terminate all kubernetes units + try { + terminateAllContainers(clusterId); + } catch (InvalidClusterException e) { + String msg = "Docker instance termination fails for cluster: " + clusterId; + LOG.error(msg, e); + throw new UnregisteredClusterException(msg, e); + } + // send cluster removal notifications and update the state + onClusterRemoval(clusterId); + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); + } } - // send cluster removal notifications and update the state - onClusterRemoval(clusterId); } - + /*** + * FIXME: A validate method shouldn't persist any data + */ @Override public boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions) throws InvalidPartitionException, InvalidCartridgeTypeException { - List<String> validatedPartitions = CloudControllerContext.getInstance().getPartitionIds(cartridgeType); - if (validatedPartitions != null) { - // cache hit for this cartridge - // get list of partitions - if (LOG.isDebugEnabled()) { - LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType); - } - } - - Map<String, IaasProvider> partitionToIaasProviders = - new ConcurrentHashMap<String, IaasProvider>(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType); - } - - Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireCartridgesWriteLock(); - if (cartridge == null) { - String msg = "Invalid Cartridge Type: " + cartridgeType; - LOG.error(msg); - throw new InvalidCartridgeTypeException(msg); - } + List<String> validatedPartitions = CloudControllerContext.getInstance().getPartitionIds(cartridgeType); + if (validatedPartitions != null) { + // cache hit for this cartridge + // get list of partitions + if (LOG.isDebugEnabled()) { + LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType); + } + } - Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>(); + Map<String, IaasProvider> partitionToIaasProviders = + new ConcurrentHashMap<String, IaasProvider>(); - for (Partition partition : partitions) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType); + } - if (validatedPartitions.contains(partition.getId())) { - // partition cache hit - continue; + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); + if (cartridge == null) { + String msg = "Invalid Cartridge Type: " + cartridgeType; + LOG.error(msg); + throw new InvalidCartridgeTypeException(msg); } - Callable<IaasProvider> worker = new PartitionValidatorCallable( - partition, cartridge); - Future<IaasProvider> job = CloudControllerContext.getInstance() - .getExecutorService().submit(worker); - jobList.put(partition.getId(), job); - } + Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>(); + for (Partition partition : partitions) { + if (validatedPartitions.contains(partition.getId())) { + // partition cache hit + continue; + } - // Retrieve the results of the concurrently performed sanity checks. - for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) { - if (entry == null) { - continue; + Callable<IaasProvider> worker = new PartitionValidatorCallable( + partition, cartridge); + Future<IaasProvider> job = CloudControllerContext.getInstance() + .getExecutorService().submit(worker); + jobList.put(partition.getId(), job); } - String partitionId = entry.getKey(); - Future<IaasProvider> job = entry.getValue(); - try { - // add to a temporary Map - partitionToIaasProviders.put(partitionId, job.get()); - // add to cache - this.cloudControllerContext.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId); + // Retrieve the results of the concurrently performed sanity checks. + for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) { + if (entry == null) { + continue; + } + String partitionId = entry.getKey(); + Future<IaasProvider> job = entry.getValue(); + try { + // add to a temporary Map + partitionToIaasProviders.put(partitionId, job.get()); - if (LOG.isDebugEnabled()) { - LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType); + // add to cache + CloudControllerContext.getInstance().addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId); + + if (LOG.isDebugEnabled()) { + LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new InvalidPartitionException(e.getMessage(), e); } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new InvalidPartitionException(e.getMessage(), e); } - } - // if and only if the deployment policy valid - cartridge.addIaasProviders(partitionToIaasProviders); + // if and only if the deployment policy valid + cartridge.addIaasProviders(partitionToIaasProviders); + CloudControllerContext.getInstance().updateCartridge(cartridge); - // persist data - persist(); + // persist data + persist(); - LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) + - " were validated successfully, against the Cartridge: " + cartridgeType); + LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) + + " were validated successfully, against the Cartridge: " + cartridgeType); - return true; + return true; + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); + } + } } private void onClusterRemoval(final String clusterId) { - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId); TopologyBuilder.handleClusterRemoved(ctxt); - cloudControllerContext.removeClusterContext(clusterId); - cloudControllerContext.removeMemberContextsOfCluster(clusterId); + CloudControllerContext.getInstance().removeClusterContext(clusterId); + CloudControllerContext.getInstance().removeMemberContextsOfCluster(clusterId); persist(); } @@ -1524,160 +1546,166 @@ public class CloudControllerServiceImpl implements CloudControllerService { } public ClusterContext getClusterContext(String clusterId) { - - return cloudControllerContext.getClusterContext(clusterId); + return CloudControllerContext.getInstance().getClusterContext(clusterId); } @Override public MemberContext[] startContainers(ContainerClusterContext containerClusterContext) throws UnregisteredCartridgeException { + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); - if (LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:startContainers"); - } - - handleNullObject(containerClusterContext, "Container start-up failed. ContainerClusterContext is null."); + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:startContainers"); + } - String clusterId = containerClusterContext.getClusterId(); - handleNullObject(clusterId, "Container start-up failed. Cluster id is null."); + handleNullObject(containerClusterContext, "Container start-up failed. ContainerClusterContext is null."); - if (LOG.isDebugEnabled()) { - LOG.debug("Received a container spawn request : " + containerClusterContext.toString()); - } + String clusterId = containerClusterContext.getClusterId(); + handleNullObject(clusterId, "Container start-up failed. Cluster id is null."); - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); - handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received a container spawn request : " + containerClusterContext.toString()); + } - String cartridgeType = ctxt.getCartridgeType(); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId); + handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString()); - Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); + String cartridgeType = ctxt.getCartridgeType(); - if (cartridge == null) { - String msg = - "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " + - containerClusterContext.toString(); - LOG.error(msg); - throw new UnregisteredCartridgeException(msg); - } + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - try { - String minReplicas = validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt); - String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); - String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext); - String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext); + if (cartridge == null) { + String msg = "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " + + containerClusterContext.toString(); + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } - KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, kubernetesPortRange); + try { + String minReplicas = validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt); + String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); + String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext); + String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext); - KubernetesApiClient kubApi = kubClusterContext.getKubApi(); + KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId, + kubernetesMasterIp, kubernetesPortRange); + KubernetesApiClient kubApi = kubClusterContext.getKubApi(); - // first let's create a replication controller. - ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController(); - ReplicationController controller = controllerFunction.apply(containerClusterContext); + // first let's create a replication controller. + ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController(); + ReplicationController controller = controllerFunction.apply(containerClusterContext); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is delegating request to start a replication controller " + controller + - " for " + containerClusterContext + " to Kubernetes layer."); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to start a replication controller " + controller + + " for " + containerClusterContext + " to Kubernetes layer."); + } - kubApi.createReplicationController(controller); + kubApi.createReplicationController(controller); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller successfully started the controller " - + controller + " via Kubernetes layer."); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller successfully started the controller " + + controller + " via Kubernetes layer."); + } - // secondly let's create a kubernetes service proxy to load balance these containers - ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService(); - Service service = serviceFunction.apply(containerClusterContext); + // secondly let's create a kubernetes service proxy to load balance these containers + ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService(); + Service service = serviceFunction.apply(containerClusterContext); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is delegating request to start a service " + service + - " for " + containerClusterContext + " to Kubernetes layer."); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to start a service " + service + + " for " + containerClusterContext + " to Kubernetes layer."); + } - kubApi.createService(service); + kubApi.createService(service); - // set host port and update - Property allocatedServiceHostPortProp = new Property(); - allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT); - allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort())); - ctxt.getProperties().addProperty(allocatedServiceHostPortProp); - cloudControllerContext.addClusterContext(ctxt); + // set host port and update + Property allocatedServiceHostPortProp = new Property(); + allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort())); + ctxt.getProperties().addProperty(allocatedServiceHostPortProp); + CloudControllerContext.getInstance().addClusterContext(ctxt); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller successfully started the service " - + controller + " via Kubernetes layer."); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller successfully started the service " + + controller + " via Kubernetes layer."); + } - // create a label query - Label l = new Label(); - l.setName(clusterId); - // execute the label query - Pod[] newlyCreatedPods = new Pod[0]; - int expectedCount = Integer.parseInt(minReplicas); + // create a label query + Label l = new Label(); + l.setName(clusterId); + // execute the label query + Pod[] newlyCreatedPods = new Pod[0]; + int expectedCount = Integer.parseInt(minReplicas); - for (int i = 0; i < expectedCount; i++) { - newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l}); + for (int i = 0; i < expectedCount; i++) { + newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l}); - if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { - LOG.debug("Pods Count: " + newlyCreatedPods.length + " for cluster: " + clusterId); + LOG.debug("Pods Count: " + newlyCreatedPods.length + " for cluster: " + clusterId); + } + if (newlyCreatedPods.length == expectedCount) { + break; + } + Thread.sleep(10000); } - if (newlyCreatedPods.length == expectedCount) { - break; + + if (newlyCreatedPods.length == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Pods are not created for cluster : %s, hence deleting the service", clusterId)); + } + terminateAllContainers(clusterId); + return new MemberContext[0]; } - Thread.sleep(10000); - } - if (newlyCreatedPods.length == 0) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Pods are not created for cluster : %s, hence deleting the service", clusterId)); - } - terminateAllContainers(clusterId); - return new MemberContext[0]; - } - if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Pods created : %s for cluster : %s", newlyCreatedPods.length, clusterId)); + } - LOG.debug(String.format("Pods created : %s for cluster : %s", newlyCreatedPods.length, clusterId)); - } + List<MemberContext> memberContexts = new ArrayList<MemberContext>(); - List<MemberContext> memberContexts = new ArrayList<MemberContext>(); + PodToMemberContext podToMemberContextFunc = new PodToMemberContext(); + // generate Member Contexts + for (Pod pod : newlyCreatedPods) { + MemberContext context = podToMemberContextFunc.apply(pod); + context.setCartridgeType(cartridgeType); + context.setClusterId(clusterId); - PodToMemberContext podToMemberContextFunc = new PodToMemberContext(); - // generate Member Contexts - for (Pod pod : newlyCreatedPods) { - MemberContext context = podToMemberContextFunc.apply(pod); - context.setCartridgeType(cartridgeType); - context.setClusterId(clusterId); + context.setProperties(CloudControllerUtil.addProperty(context + .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, + String.valueOf(service.getPort()))); - context.setProperties(CloudControllerUtil.addProperty(context - .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, - String.valueOf(service.getPort()))); + CloudControllerContext.getInstance().addMemberContext(context); - cloudControllerContext.addMemberContext(context); + // wait till Pod status turns to running and send member spawned. + ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is starting the instance start up thread."); + } + CloudControllerContext.getInstance().addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); - // wait till Pod status turns to running and send member spawned. - ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance(); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is starting the instance start up thread."); + memberContexts.add(context); } - cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); - - memberContexts.add(context); - } - // persist in registry - persist(); + // persist in registry + persist(); - LOG.info("Kubernetes entities are successfully starting up: " + memberContexts); + LOG.info("Kubernetes entities are successfully starting up: " + memberContexts); - return memberContexts.toArray(new MemberContext[0]); + return memberContexts.toArray(new MemberContext[0]); - } catch (Exception e) { - String msg = "Failed to start an instance. " + containerClusterContext.toString() + " Cause: " + e.getMessage(); - LOG.error(msg, e); - throw new IllegalStateException(msg, e); + } catch (Exception e) { + String msg = "Failed to start an instance. " + containerClusterContext.toString() + " Cause: " + e.getMessage(); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); + } + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); + } } } @@ -1700,18 +1728,18 @@ public class CloudControllerServiceImpl implements CloudControllerService { String kubernetesClusterId, String kubernetesMasterIp, String kubernetesPortRange) { - KubernetesClusterContext origCtxt = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId); + KubernetesClusterContext origCtxt = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId); KubernetesClusterContext newCtxt = new KubernetesClusterContext(kubernetesClusterId, kubernetesPortRange, kubernetesMasterIp); if (origCtxt == null) { - cloudControllerContext.addKubernetesClusterContext(newCtxt); + CloudControllerContext.getInstance().addKubernetesClusterContext(newCtxt); return newCtxt; } if (!origCtxt.equals(newCtxt)) { // if for some reason master IP etc. have changed newCtxt.setAvailableHostPorts(origCtxt.getAvailableHostPorts()); - cloudControllerContext.addKubernetesClusterContext(newCtxt); + CloudControllerContext.getInstance().addKubernetesClusterContext(newCtxt); return newCtxt; } else { return origCtxt; @@ -1721,228 +1749,242 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public MemberContext[] terminateAllContainers(String clusterId) throws InvalidClusterException { + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); - handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId); - - String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(), - StratosConstants.KUBERNETES_CLUSTER_ID); - handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" + - StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt); + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId); + handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId); - KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId); - handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: " - + kubernetesClusterId); + String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.KUBERNETES_CLUSTER_ID); + handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" + + StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt); - KubernetesApiClient kubApi = kubClusterContext.getKubApi(); - // delete the service - try { - kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId)); - } catch (KubernetesClientException e) { - // we're not going to throw this error, but proceed with other deletions - LOG.error("Failed to delete Kubernetes service with id: " + clusterId, e); - } + KubernetesClusterContext kubClusterContext = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId); + handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: " + + kubernetesClusterId); - // set replicas=0 for the replication controller - try { - kubApi.updateReplicationController(clusterId, 0); - } catch (KubernetesClientException e) { - // we're not going to throw this error, but proceed with other deletions - LOG.error("Failed to update Kubernetes Controller with id: " + clusterId, e); - } + KubernetesApiClient kubApi = kubClusterContext.getKubApi(); + // delete the service + try { + kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId)); + } catch (KubernetesClientException e) { + // we're not going to throw this error, but proceed with other deletions + LOG.error("Failed to delete Kubernetes service with id: " + clusterId, e); + } - // delete pods forcefully - try { - // create a label query - Label l = new Label(); - l.setName(clusterId); - // execute the label query - Pod[] pods = kubApi.getSelectedPods(new Label[]{l}); + // set replicas=0 for the replication controller + try { + kubApi.updateReplicationController(clusterId, 0); + } catch (KubernetesClientException e) { + // we're not going to throw this error, but proceed with other deletions + LOG.error("Failed to update Kubernetes Controller with id: " + clusterId, e); + } - for (Pod pod : pods) { - try { - // delete pods forcefully - kubApi.deletePod(pod.getId()); - } catch (KubernetesClientException ignore) { - // we can't do nothing here - LOG.warn(String.format("Failed to delete Pod [%s] forcefully!", pod.getId())); + // delete pods forcefully + try { + // create a label query + Label l = new Label(); + l.setName(clusterId); + // execute the label query + Pod[] pods = kubApi.getSelectedPods(new Label[]{l}); + + for (Pod pod : pods) { + try { + // delete pods forcefully + kubApi.deletePod(pod.getId()); + } catch (KubernetesClientException ignore) { + // we can't do nothing here + LOG.warn(String.format("Failed to delete Pod [%s] forcefully!", pod.getId())); + } } + } catch (KubernetesClientException e) { + // we're not going to throw this error, but proceed with other deletions + LOG.error("Failed to delete pods forcefully for cluster: " + clusterId, e); } - } catch (KubernetesClientException e) { - // we're not going to throw this error, but proceed with other deletions - LOG.error("Failed to delete pods forcefully for cluster: " + clusterId, e); - } - - // delete the replication controller. - try { - kubApi.deleteReplicationController(clusterId); - } catch (KubernetesClientException e) { - String msg = "Failed to delete Kubernetes Controller with id: " + clusterId; - LOG.error(msg, e); - throw new InvalidClusterException(msg, e); - } - String allocatedPort = CloudControllerUtil.getProperty(ctxt.getProperties(), - StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + // delete the replication controller. + try { + kubApi.deleteReplicationController(clusterId); + } catch (KubernetesClientException e) { + String msg = "Failed to delete Kubernetes Controller with id: " + clusterId; + LOG.error(msg, e); + throw new InvalidClusterException(msg, e); + } - if (allocatedPort != null) { - kubClusterContext.deallocateHostPort(Integer - .parseInt(allocatedPort)); - } else { - LOG.warn("Host port dealloacation failed due to a missing property: " - + StratosConstants.ALLOCATED_SERVICE_HOST_PORT); - } + String allocatedPort = CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.ALLOCATED_SERVICE_HOST_PORT); - List<MemberContext> membersToBeRemoved = cloudControllerContext.getMemberContextsOfClusterId(clusterId); + if (allocatedPort != null) { + kubClusterContext.deallocateHostPort(Integer + .parseInt(allocatedPort)); + } else { + LOG.warn("Host port dealloacation failed due to a missing property: " + + StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + } - for (MemberContext memberContext : membersToBeRemoved) { - logTermination(memberContext); - } + List<MemberContext> membersToBeRemoved = CloudControllerContext.getInstance().getMemberContextsOfClusterId(clusterId); - // persist - persist(); + for (MemberContext memberContext : membersToBeRemoved) { + logTermination(memberContext); + } - return membersToBeRemoved.toArray(new MemberContext[0]); + // persist + persist(); + return membersToBeRemoved.toArray(new MemberContext[0]); + } finally { + if(lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); + } + } } @Override public MemberContext[] updateContainers(String clusterId, int replicas) throws UnregisteredCartridgeException { + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); - if (LOG.isDebugEnabled()) { - LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId); - } - - ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); - handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId); - - String cartridgeType = ctxt.getCartridgeType(); - - Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); + if (LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId); + } - if (cartridge == null) { - String msg = - "Container update failed. No matching Cartridge found [type] " + cartridgeType - + ". [cluster id] " + clusterId; - LOG.error(msg); - throw new UnregisteredCartridgeException(msg); - } + ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId); + handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId); - try { - String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); + String cartridgeType = ctxt.getCartridgeType(); - KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - if (kubClusterContext == null) { + if (cartridge == null) { String msg = - "Instance start-up failed. No matching Kubernetes Context Found for [id] " + kubernetesClusterId + "Container update failed. No matching Cartridge found [type] " + cartridgeType + ". [cluster id] " + clusterId; LOG.error(msg); throw new UnregisteredCartridgeException(msg); } - KubernetesApiClient kubApi = kubClusterContext.getKubApi(); - // create a label query - Label l = new Label(); - l.setName(clusterId); + try { + String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); - // get the current pods - useful when scale down - Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l}); + KubernetesClusterContext kubClusterContext = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId); - // update the replication controller - cluster id = replication controller id - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is delegating request to update a replication controller " + clusterId + - " to Kubernetes layer."); - } + if (kubClusterContext == null) { + String msg = + "Instance start-up failed. No matching Kubernetes Context Found for [id] " + kubernetesClusterId + + ". [cluster id] " + clusterId; + LOG.error(msg); + throw new UnregisteredCartridgeException(msg); + } - kubApi.updateReplicationController(clusterId, replicas); + KubernetesApiClient kubApi = kubClusterContext.getKubApi(); + // create a label query + Label l = new Label(); + l.setName(clusterId); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller successfully updated the controller " - + clusterId + " via Kubernetes layer."); - } + // get the current pods - useful when scale down + Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l}); - // execute the label query - Pod[] allPods = new Pod[0]; + // update the replication controller - cluster id = replication controller id + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is delegating request to update a replication controller " + clusterId + + " to Kubernetes layer."); + } - // wait replicas*5s time in the worst case ; best case = 0s - for (int i = 0; i < (replicas * previousStatePods.length + 1); i++) { - allPods = kubApi.getSelectedPods(new Label[]{l}); + kubApi.updateReplicationController(clusterId, replicas); if (LOG.isDebugEnabled()) { - - LOG.debug("Pods Count: " + allPods.length + " for cluster: " + clusterId); + LOG.debug("Cloud Controller successfully updated the controller " + + clusterId + " via Kubernetes layer."); } - if (allPods.length == replicas) { - break; - } - Thread.sleep(10000); - } - if (LOG.isDebugEnabled()) { + // execute the label query + Pod[] allPods = new Pod[0]; - LOG.debug(String.format("Pods created : %s for cluster : %s", allPods.length, clusterId)); - } + // wait replicas*5s time in the worst case ; best case = 0s + for (int i = 0; i < (replicas * previousStatePods.length + 1); i++) { + allPods = kubApi.getSelectedPods(new Label[]{l}); - List<MemberContext> memberContexts = new ArrayList<MemberContext>(); + if (LOG.isDebugEnabled()) { - PodToMemberContext podToMemberContextFunc = new PodToMemberContext(); - // generate Member Contexts - for (Pod pod : allPods) { - MemberContext context; - // if member context does not exist -> a new member (scale up) - if ((context = cloudControllerContext.getMemberContextOfMemberId(pod.getId())) == null) { + LOG.debug("Pods Count: " + allPods.length + " for cluster: " + clusterId); + } + if (allPods.length == replicas) { + break; + } + Thread.sleep(10000); + } - context = podToMemberContextFunc.apply(pod); - context.setCartridgeType(cartridgeType); - context.setClusterId(clusterId); + if (LOG.isDebugEnabled()) { - context.setProperties(CloudControllerUtil.addProperty(context - .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, - CloudControllerUtil.getProperty(ctxt.getProperties(), - StratosConstants.ALLOCATED_SERVICE_HOST_PORT))); + LOG.debug(String.format("Pods created : %s for cluster : %s", allPods.length, clusterId)); + } - // wait till Pod status turns to running and send member spawned. - ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance(); - if (LOG.isDebugEnabled()) { - LOG.debug("Cloud Controller is starting the instance start up thread."); - } - cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); + List<MemberContext> memberContexts = new ArrayList<MemberContext>(); - memberContexts.add(context); + PodToMemberContext podToMemberContextFunc = new PodToMemberContext(); + // generate Member Contexts + for (Pod pod : allPods) { + MemberContext context; + // if member context does not exist -> a new member (scale up) + if ((context = CloudControllerContext.getInstance().getMemberContextOfMemberId(pod.getId())) == null) { - } - // publish data - // TODO -// CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node); + context = podToMemberContextFunc.apply(pod); + context.setCartridgeType(cartridgeType); + context.setClusterId(clusterId); - } + context.setProperties(CloudControllerUtil.addProperty(context + .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, + CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.ALLOCATED_SERVICE_HOST_PORT))); + + // wait till Pod status turns to running and send member spawned. + ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cloud Controller is starting the instance start up thread."); + } + CloudControllerContext.getInstance().addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); - if (memberContexts.isEmpty()) { - // terminated members - @SuppressWarnings("unchecked") - List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods)); - for (Pod pod : difference) { - if (pod != null) { - MemberContext context = cloudControllerContext.getMemberContextOfMemberId(pod.getId()); - logTermination(context); memberContexts.add(context); + } + // publish data + // TODO +// CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node); + } - } + if (memberContexts.isEmpty()) { + // terminated members + @SuppressWarnings("unchecked") + List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods)); + for (Pod pod : difference) { + if (pod != null) { + MemberContext context = CloudControllerContext.getInstance().getMe <TRUNCATED>
