Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 8bdd0b4a9 -> e8ebfdf49
Select next available sequence numbers for pods, services and append application id Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e8ebfdf4 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e8ebfdf4 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e8ebfdf4 Branch: refs/heads/stratos-4.1.x Commit: e8ebfdf491b279a45ff4ff79bdbcabd65ceab5d0 Parents: 8bdd0b4 Author: Imesh Gunaratne <[email protected]> Authored: Mon Sep 14 02:05:15 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Mon Sep 14 15:30:06 2015 +0530 ---------------------------------------------------------------------- .../kubernetes/KubernetesClusterContext.java | 23 ++++--- .../iaases/kubernetes/KubernetesIaas.java | 71 +++++++++++++++----- 2 files changed, 67 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/e8ebfdf4/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java index 7e6d557..dc855a4 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java @@ -43,13 +43,15 @@ public class KubernetesClusterContext implements Serializable { private String masterPort; private List<Integer> servicePortSequence; private Map<String, KubernetesService> kubernetesServices; + private Map<String, AtomicLong> serviceSeqNoMap; + private Map<String, AtomicLong> podSeqNoMap; private transient KubernetesApiClient kubApi; - private AtomicLong serviceSeqNo; - private AtomicLong podSeqNo; public KubernetesClusterContext(String id, String masterIp, String masterPort, int lowerPort, int upperPort) { this.servicePortSequence = new ArrayList<>(); this.kubernetesServices = new HashMap<>(); + this.serviceSeqNoMap = new HashMap<>(); + this.podSeqNoMap = new HashMap<>(); this.lowerPort = lowerPort; this.upperPort = upperPort; @@ -59,9 +61,6 @@ public class KubernetesClusterContext implements Serializable { this.masterIp = masterIp; this.masterPort = masterPort; this.setKubApi(new KubernetesApiClient(getEndpoint(masterIp, masterPort))); - this.serviceSeqNo = new AtomicLong(); - this.podSeqNo = new AtomicLong(); - } private String getEndpoint(String ip, String port) { @@ -164,12 +163,18 @@ public class KubernetesClusterContext implements Serializable { this.lowerPort = lowerPort; } - public AtomicLong getServiceSeqNo() { - return serviceSeqNo; + public AtomicLong getServiceSeqNo(String applicationId) { + if(!serviceSeqNoMap.containsKey(applicationId)) { + serviceSeqNoMap.put(applicationId, new AtomicLong()); + } + return serviceSeqNoMap.get(applicationId); } - public AtomicLong getPodSeqNo() { - return podSeqNo; + public AtomicLong getPodSeqNo(String applicationId) { + if(!podSeqNoMap.containsKey(applicationId)) { + podSeqNoMap.put(applicationId, new AtomicLong()); + } + return podSeqNoMap.get(applicationId); } @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/e8ebfdf4/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java index 9796a8c..cf913b0 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java @@ -64,6 +64,8 @@ public class KubernetesIaas extends Iaas { private static final String KUBERNETES_SERVICE_SESSION_AFFINITY = "KUBERNETES_SERVICE_SESSION_AFFINITY"; private static final String KUBERNETES_CONTAINER_CPU_DEFAULT = "kubernetes.container.cpu.default"; private static final String KUBERNETES_CONTAINER_MEMORY_DEFAULT = "kubernetes.container.memory.default"; + public static final String POD_ID_PREFIX = "pod"; + public static final String SERVICE_NAME_PREFIX = "service"; private PartitionValidator partitionValidator; private List<NameValuePair> payload; @@ -400,9 +402,15 @@ public class KubernetesIaas extends Iaas { // Add dynamic payload to the member context memberContext.setDynamicPayload(payload.toArray(new NameValuePair[payload.size()])); + // Find next available sequence number + long podSeqNo = kubernetesClusterContext.getPodSeqNo(applicationId).incrementAndGet(); + String podId = preparePodId(applicationId, podSeqNo); + while(kubernetesApi.getPod(podId) != null) { + podSeqNo = kubernetesClusterContext.getPodSeqNo(applicationId).incrementAndGet(); + podId = preparePodId(applicationId, podSeqNo); + } + // Create pod - long podSeqNo = kubernetesClusterContext.getPodSeqNo().incrementAndGet(); - String podId = "pod" + "-" + podSeqNo; String podLabel = DigestUtils.md5Hex(clusterId); String dockerImage = iaasProvider.getImage(); List<EnvVar> environmentVariables = KubernetesIaasUtil.prepareEnvironmentVariables( @@ -437,6 +445,10 @@ public class KubernetesIaas extends Iaas { CloudControllerContext.getInstance().persist(); } + private String preparePodId(String applicationId, long podSeqNo) { + return applicationId + "-" + POD_ID_PREFIX + "-" + podSeqNo; + } + /** * Creates and returns proxy services for the cluster. * @@ -451,6 +463,7 @@ public class KubernetesIaas extends Iaas { KubernetesClusterContext kubernetesClusterContext) throws KubernetesClientException { + String applicationId = clusterContext.getApplicationId(); String clusterId = clusterContext.getClusterId(); String cartridgeType = clusterContext.getCartridgeType(); @@ -493,9 +506,16 @@ public class KubernetesIaas extends Iaas { continue; } - // Find next service sequence no - long serviceSeqNo = kubernetesClusterContext.getServiceSeqNo().incrementAndGet(); - String serviceName = KubernetesIaasUtil.fixSpecialCharacters("service" + "-" + (serviceSeqNo)); + // Find next available service sequence number + long serviceSeqNo = kubernetesClusterContext.getServiceSeqNo(applicationId).incrementAndGet(); + String serviceName = KubernetesIaasUtil.fixSpecialCharacters( + prepareServiceName(applicationId, serviceSeqNo)); + + while(kubernetesApi.getService(serviceName) != null) { + serviceSeqNo = kubernetesClusterContext.getServiceSeqNo(applicationId).incrementAndGet(); + serviceName = KubernetesIaasUtil.fixSpecialCharacters( + prepareServiceName(applicationId, serviceSeqNo)); + } if (log.isInfoEnabled()) { log.info(String.format("Creating kubernetes service: [cluster] %s [service] %s [service-label] %s " + @@ -510,18 +530,8 @@ public class KubernetesIaas extends Iaas { String containerPortName = KubernetesIaasUtil.preparePortNameFromPortMapping(clusterPortMapping); try { - // If kubernetes service is already created, skip creating a new one - if (kubernetesApi.getService(serviceName) == null) { - // Services need to use minions private IP addresses for creating iptable rules - kubernetesApi.createService(serviceName, serviceLabel, servicePort, serviceType, containerPortName, + kubernetesApi.createService(serviceName, serviceLabel, servicePort, serviceType, containerPortName, containerPort, sessionAffinity); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes service is already created: [cluster] %s [service] %s " + - "[protocol] %s [service-port] %d [container-port] %d", clusterId, - serviceName, clusterPortMapping.getProtocol(), servicePort, containerPort)); - } - } } finally { // Persist kubernetes service sequence no CloudControllerContext.getInstance().persist(); @@ -570,6 +580,10 @@ public class KubernetesIaas extends Iaas { } } + private String prepareServiceName(String applicationId, long serviceSeqNo) { + return applicationId + "-" + SERVICE_NAME_PREFIX + "-" + (serviceSeqNo); + } + private List<String> prepareMinionIPAddresses(KubernetesCluster kubernetesCluster) { List<String> minionPublicIPList = new ArrayList<String>(); KubernetesHost[] kubernetesHosts = kubernetesCluster.getKubernetesHosts(); @@ -609,7 +623,7 @@ public class KubernetesIaas extends Iaas { */ private void generateKubernetesServicePorts(String applicationId, String clusterId, KubernetesClusterContext kubernetesClusterContext, - Cartridge cartridge) { + Cartridge cartridge) throws KubernetesClientException { synchronized (KubernetesIaas.class) { if (cartridge != null) { @@ -640,7 +654,7 @@ public class KubernetesIaas extends Iaas { String serviceType = portMapping.getKubernetesPortType(); clusterPortMapping.setKubernetesServiceType(serviceType); - //If kubernetes service port is already set, skip setting a new one + // If kubernetes service port is already set, skip setting a new one if (clusterPortMapping.getKubernetesServicePort() == 0) { if (serviceType.equals(KubernetesConstants.NODE_PORT)) { int nextServicePort = kubernetesClusterContext.getNextServicePort(); @@ -648,6 +662,14 @@ public class KubernetesIaas extends Iaas { throw new RuntimeException(String.format("Could not generate service port: [cluster-id] %s " + "[port] %d", clusterId, portMapping.getPort())); } + + // Find next available service port + KubernetesApiClient kubernetesApi = kubernetesClusterContext.getKubApi(); + List<Service> services = kubernetesApi.getServices(); + while(!nodePortAvailable(services, nextServicePort)) { + nextServicePort = kubernetesClusterContext.getNextServicePort(); + } + clusterPortMapping.setKubernetesServicePort(nextServicePort); } else { clusterPortMapping.setKubernetesServicePort(portMapping.getPort()); @@ -687,6 +709,19 @@ public class KubernetesIaas extends Iaas { } } + private boolean nodePortAvailable(List<Service> services, int nodePort) + throws KubernetesClientException { + + for(Service service : services) { + for(ServicePort servicePort : service.getSpec().getPorts()) { + if(servicePort.getNodePort() == nodePort) { + return false; + } + } + } + return true; + } + /** * Find cluster port mapping that corresponds to cartridge port mapping.
