Use a global counter for generating sequence ids for Kub services and pods
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7d6279d9 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7d6279d9 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7d6279d9 Branch: refs/heads/stratos-4.1.x Commit: 7d6279d9aaaf762cbf44d1c4aefda1f62dd43043 Parents: 9a2279a Author: Akila Perera <[email protected]> Authored: Fri Sep 18 17:19:20 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Sun Sep 20 00:15:54 2015 +0530 ---------------------------------------------------------------------- .../kubernetes/KubernetesClusterContext.java | 70 +++++++---- .../iaases/kubernetes/KubernetesIaas.java | 119 ++++++++++--------- 2 files changed, 108 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/7d6279d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java index dc855a4..7c0ae22 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java @@ -43,15 +43,17 @@ public class KubernetesClusterContext implements Serializable { private String masterPort; private List<Integer> servicePortSequence; private Map<String, KubernetesService> kubernetesServices; - private Map<String, AtomicLong> serviceSeqNoMap; - private Map<String, AtomicLong> podSeqNoMap; + private AtomicLong serviceSeqNo; + private AtomicLong podSeqNo; private transient KubernetesApiClient kubApi; + public static final long MAX_POD_ID = 99999999999999L; + public static final long MAX_SERVICE_ID = 99999999999999L; public KubernetesClusterContext(String id, String masterIp, String masterPort, int lowerPort, int upperPort) { this.servicePortSequence = new ArrayList<>(); this.kubernetesServices = new HashMap<>(); - this.serviceSeqNoMap = new HashMap<>(); - this.podSeqNoMap = new HashMap<>(); + serviceSeqNo = new AtomicLong(0); + podSeqNo = new AtomicLong(0); this.lowerPort = lowerPort; this.upperPort = upperPort; @@ -85,6 +87,7 @@ public class KubernetesClusterContext implements Serializable { /*** * Get next available service port. + * * @return */ public int getNextServicePort() { @@ -96,6 +99,7 @@ public class KubernetesClusterContext implements Serializable { /** * Deallocate a service port by adding it again to the sequence. + * * @param port */ public void deallocatePort(int port) { @@ -107,6 +111,7 @@ public class KubernetesClusterContext implements Serializable { /** * Initialize service port sequence according to the given port range. + * * @param lowerPort * @param upperPort */ @@ -163,18 +168,20 @@ public class KubernetesClusterContext implements Serializable { this.lowerPort = lowerPort; } - public AtomicLong getServiceSeqNo(String applicationId) { - if(!serviceSeqNoMap.containsKey(applicationId)) { - serviceSeqNoMap.put(applicationId, new AtomicLong()); + public long getNextServiceSeqNo() { + // reset before we hit the max character length for Kub service id + if (serviceSeqNo.get() > MAX_SERVICE_ID) { + serviceSeqNo.set(0); } - return serviceSeqNoMap.get(applicationId); + return serviceSeqNo.incrementAndGet(); } - public AtomicLong getPodSeqNo(String applicationId) { - if(!podSeqNoMap.containsKey(applicationId)) { - podSeqNoMap.put(applicationId, new AtomicLong()); + public long getNextPodSeqNo() { + // reset before we hit the max character length for Kub pod id + if (podSeqNo.get() > MAX_POD_ID) { + podSeqNo.set(0); } - return podSeqNoMap.get(applicationId); + return podSeqNo.incrementAndGet(); } @Override @@ -192,37 +199,50 @@ public class KubernetesClusterContext implements Serializable { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } KubernetesClusterContext other = (KubernetesClusterContext) obj; if (servicePortSequence == null) { - if (other.servicePortSequence != null) + if (other.servicePortSequence != null) { return false; - } else if (!servicePortSequence.equals(other.servicePortSequence)) + } + } else if (!servicePortSequence.equals(other.servicePortSequence)) { return false; + } if (kubernetesClusterId == null) { - if (other.kubernetesClusterId != null) + if (other.kubernetesClusterId != null) { return false; - } else if (!kubernetesClusterId.equals(other.kubernetesClusterId)) + } + } else if (!kubernetesClusterId.equals(other.kubernetesClusterId)) { return false; - if (lowerPort != other.lowerPort) + } + if (lowerPort != other.lowerPort) { return false; + } if (masterIp == null) { - if (other.masterIp != null) + if (other.masterIp != null) { return false; - } else if (!masterIp.equals(other.masterIp)) + } + } else if (!masterIp.equals(other.masterIp)) { return false; + } if (masterPort == null) { - if (other.masterPort != null) + if (other.masterPort != null) { return false; - } else if (!masterPort.equals(other.masterPort)) + } + } else if (!masterPort.equals(other.masterPort)) { return false; - if (upperPort != other.upperPort) + } + if (upperPort != other.upperPort) { return false; + } return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d6279d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java index 44d8d1d..5867184 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java @@ -44,10 +44,7 @@ import org.apache.stratos.kubernetes.client.KubernetesConstants; import org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException; import org.apache.stratos.messaging.domain.topology.KubernetesService; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; import java.util.concurrent.locks.Lock; /** @@ -395,7 +392,8 @@ public class KubernetesIaas extends Iaas { memory = memoryProperty.getValue(); } - IaasProvider iaasProvider = CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridge.getType(), partition.getId()); + IaasProvider iaasProvider = + CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridge.getType(), partition.getId()); if (iaasProvider == null) { String message = "Could not find iaas provider: [partition] " + partition.getId(); log.error(message); @@ -406,15 +404,15 @@ public class KubernetesIaas extends Iaas { memberContext.setDynamicPayload(payload.toArray(new NameValuePair[payload.size()])); // Find next available sequence number - long podSeqNo = kubernetesClusterContext.getPodSeqNo(applicationId).incrementAndGet(); - String podId = preparePodId(applicationId, podSeqNo); - while(kubernetesApi.getPod(podId) != null) { - podSeqNo = kubernetesClusterContext.getPodSeqNo(applicationId).incrementAndGet(); - podId = preparePodId(applicationId, podSeqNo); + long podSeqNo = kubernetesClusterContext.getNextPodSeqNo(); + String podId = preparePodId(podSeqNo); + while (kubernetesApi.getPod(podId) != null) { + podSeqNo = kubernetesClusterContext.getNextPodSeqNo(); + podId = preparePodId(podSeqNo); } // Create pod - String podLabel = DigestUtils.md5Hex(clusterId); + String podName = DigestUtils.md5Hex(clusterId); String dockerImage = iaasProvider.getImage(); List<EnvVar> environmentVariables = KubernetesIaasUtil.prepareEnvironmentVariables( clusterContext, memberContext); @@ -426,16 +424,23 @@ public class KubernetesIaas extends Iaas { memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), cpu, memory)); - kubernetesApi.createPod(podId, podLabel, dockerImage, cpu, memory, ports, environmentVariables); + Map<String, String> podLabels = new HashMap<>(); + podLabels.put("applicationId", memberContext.getApplicationId()); + podLabels.put("clusterId", memberContext.getClusterId()); + podLabels.put("clusterInstanceId", memberContext.getClusterInstanceId()); + podLabels.put("memberId", memberContext.getMemberId()); + podLabels.put("cartridgeType", memberContext.getCartridgeType()); + + kubernetesApi.createPod(podId, podName, podLabels, dockerImage, cpu, memory, ports, environmentVariables); log.info(String.format("Pod started successfully: [application] %s [cartridge] %s [member] %s " + "[pod] %s [pod-label] %s [cpu] %s [memory] %s", memberContext.getApplicationId(), memberContext.getCartridgeType(), - memberContext.getMemberId(), podId, podLabel, cpu, memory)); + memberContext.getMemberId(), podId, podName, cpu, memory)); // Add pod id to member context memberContext.setKubernetesPodId(podId); - memberContext.setKubernetesPodLabel(podLabel); + memberContext.setKubernetesPodName(podName); // Create instance metadata InstanceMetadata instanceMetadata = new InstanceMetadata(); @@ -448,8 +453,8 @@ public class KubernetesIaas extends Iaas { CloudControllerContext.getInstance().persist(); } - private String preparePodId(String applicationId, long podSeqNo) { - return applicationId + "-" + POD_ID_PREFIX + "-" + podSeqNo; + private String preparePodId(long podSeqNo) { + return POD_ID_PREFIX + "-" + podSeqNo; } /** @@ -463,13 +468,10 @@ public class KubernetesIaas extends Iaas { */ private void createKubernetesServices(KubernetesApiClient kubernetesApi, ClusterContext clusterContext, KubernetesCluster kubernetesCluster, - KubernetesClusterContext kubernetesClusterContext,MemberContext memberContext) + KubernetesClusterContext kubernetesClusterContext) throws KubernetesClientException { - - String applicationId = clusterContext.getApplicationId(); String clusterId = clusterContext.getClusterId(); String cartridgeType = clusterContext.getCartridgeType(); - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); if (cartridge == null) { String message = "Could not create kubernetes services, cartridge not found: [cartridge] " + @@ -492,62 +494,67 @@ public class KubernetesIaas extends Iaas { Collection<ClusterPortMapping> clusterPortMappings = CloudControllerContext.getInstance() .getClusterPortMappings(clusterContext.getApplicationId(), clusterId); + Map<String, String> serviceLabels = new HashMap<>(); if (clusterPortMappings != null) { - String serviceLabel = DigestUtils.md5Hex(clusterId); + String serviceName = DigestUtils.md5Hex(clusterId); Collection<KubernetesService> kubernetesServices = kubernetesClusterContext.getKubernetesServices(); for (ClusterPortMapping clusterPortMapping : clusterPortMappings) { // Skip if already created int containerPort = clusterPortMapping.getPort(); if (kubernetesServiceExist(kubernetesServices, containerPort)) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug(String.format("Kubernetes service already exists: [kubernetes-cluster] %s " + - "[cluster] %s [service-label] %s [container-port] %d ", - kubernetesCluster.getClusterId(), clusterId, serviceLabel, containerPort)); + "[cluster] %s [service-name] %s [container-port] %d ", + kubernetesCluster.getClusterId(), clusterId, serviceName, containerPort)); } continue; } // Find next available service sequence number - long serviceSeqNo = kubernetesClusterContext.getServiceSeqNo(applicationId).incrementAndGet(); - String serviceName = KubernetesIaasUtil.fixSpecialCharacters( - prepareServiceName(applicationId, serviceSeqNo)); - - while(kubernetesApi.getService(serviceName) != null) { - serviceSeqNo = kubernetesClusterContext.getServiceSeqNo(applicationId).incrementAndGet(); - serviceName = KubernetesIaasUtil.fixSpecialCharacters( - prepareServiceName(applicationId, serviceSeqNo)); + long serviceSeqNo = kubernetesClusterContext.getNextServiceSeqNo(); + String serviceId = + KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo)); + while (kubernetesApi.getService(serviceId) != null) { + serviceSeqNo = kubernetesClusterContext.getNextServiceSeqNo(); + serviceId = + KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo)); } if (log.isInfoEnabled()) { - log.info(String.format("Creating kubernetes service: [cluster] %s [service] %s [service-label] %s " + - "[protocol] %s [service-port] %d [container-port] %s", clusterId, - serviceName, serviceLabel, clusterPortMapping.getProtocol(), - clusterPortMapping.getKubernetesServicePort(), containerPort)); + log.info( + String.format("Creating kubernetes service: [cluster] %s [service-id] %s [service-name] " + + "%s " + "[protocol] %s [service-port] %d [container-port] %s", clusterId, + serviceId, serviceName, clusterPortMapping.getProtocol(), + clusterPortMapping.getKubernetesServicePort(), containerPort)); } // Create kubernetes service for port mapping int servicePort = clusterPortMapping.getKubernetesServicePort(); String serviceType = clusterPortMapping.getKubernetesServiceType(); String containerPortName = KubernetesIaasUtil.preparePortNameFromPortMapping(clusterPortMapping); - - try { - kubernetesApi.createService(serviceName, serviceLabel, servicePort, serviceType, containerPortName, - containerPort, sessionAffinity); - } finally { - // Persist kubernetes service sequence no - CloudControllerContext.getInstance().persist(); - } - + serviceLabels.put("applicationId", clusterPortMapping.getApplicationId()); + serviceLabels.put("clusterId", clusterPortMapping.getClusterId()); + serviceLabels.put("name", clusterPortMapping.getName()); + serviceLabels.put("protocol", clusterPortMapping.getProtocol()); + serviceLabels.put("serviceType", clusterPortMapping.getKubernetesServiceType()); + serviceLabels.put("portType", clusterPortMapping.getKubernetesPortType()); + serviceLabels.put("servicePort", String.valueOf(clusterPortMapping.getKubernetesServicePort())); + serviceLabels.put("port", String.valueOf(clusterPortMapping.getPort())); + serviceLabels.put("proxyPort", String.valueOf(clusterPortMapping.getProxyPort())); + + kubernetesApi.createService(serviceId, serviceName, serviceLabels, servicePort, serviceType, + containerPortName, containerPort, sessionAffinity); try { Thread.sleep(1000); - } catch (InterruptedException ignore) { + } + catch (InterruptedException ignore) { } - Service service = kubernetesApi.getService(serviceName); - if(service == null) { - throw new KubernetesClientException("Kubernetes service not found: [service] " + serviceName); + Service service = kubernetesApi.getService(serviceId); + if (service == null) { + throw new KubernetesClientException("Kubernetes service not found: [service-id] " + serviceId); } KubernetesService kubernetesService = new KubernetesService(); @@ -561,7 +568,6 @@ public class KubernetesIaas extends Iaas { String kubernetesServiceType = service.getSpec().getType(); kubernetesService.setServiceType(kubernetesServiceType); - kubernetesService.setKubernetesClusterId(memberContext.getPartition().getKubernetesClusterId()); if (kubernetesServiceType.equals(KubernetesConstants.NODE_PORT)) { kubernetesService.setPort(service.getSpec().getPorts().get(0).getNodePort()); @@ -576,16 +582,17 @@ public class KubernetesIaas extends Iaas { CloudControllerContext.getInstance().persist(); if (log.isInfoEnabled()) { - log.info(String.format("Kubernetes service successfully created: [cluster] %s [service] %s " + - "[protocol] %s [node-port] %d [container-port] %s", clusterId, - serviceName, clusterPortMapping.getProtocol(), servicePort, containerPort)); + log.info(String.format( + "Kubernetes service successfully created: [cluster] %s [service-id] %s [protocol] %s " + + "[node-port] %d [container-port] %s", clusterId, serviceId, + clusterPortMapping.getProtocol(), servicePort, containerPort)); } } } } - private String prepareServiceName(String applicationId, long serviceSeqNo) { - return applicationId + "-" + SERVICE_NAME_PREFIX + "-" + (serviceSeqNo); + private String prepareServiceName(long serviceSeqNo) { + return SERVICE_NAME_PREFIX + "-" + (serviceSeqNo); } private List<String> prepareMinionIPAddresses(KubernetesCluster kubernetesCluster) { @@ -989,4 +996,4 @@ public class KubernetesIaas extends Iaas { } } -} +} \ No newline at end of file
