Use a global counter for generating sequence ids for Kub services and pods

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7d6279d9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7d6279d9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7d6279d9

Branch: refs/heads/stratos-4.1.x
Commit: 7d6279d9aaaf762cbf44d1c4aefda1f62dd43043
Parents: 9a2279a
Author: Akila Perera <[email protected]>
Authored: Fri Sep 18 17:19:20 2015 +0530
Committer: Akila Perera <[email protected]>
Committed: Sun Sep 20 00:15:54 2015 +0530

----------------------------------------------------------------------
 .../kubernetes/KubernetesClusterContext.java    |  70 +++++++----
 .../iaases/kubernetes/KubernetesIaas.java       | 119 ++++++++++---------
 2 files changed, 108 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/7d6279d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
index dc855a4..7c0ae22 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
@@ -43,15 +43,17 @@ public class KubernetesClusterContext implements 
Serializable {
     private String masterPort;
     private List<Integer> servicePortSequence;
     private Map<String, KubernetesService> kubernetesServices;
-    private Map<String, AtomicLong> serviceSeqNoMap;
-    private Map<String, AtomicLong> podSeqNoMap;
+    private AtomicLong serviceSeqNo;
+    private AtomicLong podSeqNo;
     private transient KubernetesApiClient kubApi;
+    public static final long MAX_POD_ID = 99999999999999L;
+    public static final long MAX_SERVICE_ID = 99999999999999L;
 
     public KubernetesClusterContext(String id, String masterIp, String 
masterPort, int lowerPort, int upperPort) {
         this.servicePortSequence = new ArrayList<>();
         this.kubernetesServices = new HashMap<>();
-        this.serviceSeqNoMap = new HashMap<>();
-        this.podSeqNoMap = new HashMap<>();
+        serviceSeqNo = new AtomicLong(0);
+        podSeqNo = new AtomicLong(0);
 
         this.lowerPort = lowerPort;
         this.upperPort = upperPort;
@@ -85,6 +87,7 @@ public class KubernetesClusterContext implements Serializable 
{
 
     /***
      * Get next available service port.
+     *
      * @return
      */
     public int getNextServicePort() {
@@ -96,6 +99,7 @@ public class KubernetesClusterContext implements Serializable 
{
 
     /**
      * Deallocate a service port by adding it again to the sequence.
+     *
      * @param port
      */
     public void deallocatePort(int port) {
@@ -107,6 +111,7 @@ public class KubernetesClusterContext implements 
Serializable {
 
     /**
      * Initialize service port sequence according to the given port range.
+     *
      * @param lowerPort
      * @param upperPort
      */
@@ -163,18 +168,20 @@ public class KubernetesClusterContext implements 
Serializable {
         this.lowerPort = lowerPort;
     }
 
-    public AtomicLong getServiceSeqNo(String applicationId) {
-        if(!serviceSeqNoMap.containsKey(applicationId)) {
-            serviceSeqNoMap.put(applicationId, new AtomicLong());
+    public long getNextServiceSeqNo() {
+        // reset before we hit the max character length for Kub service id
+        if (serviceSeqNo.get() > MAX_SERVICE_ID) {
+            serviceSeqNo.set(0);
         }
-        return serviceSeqNoMap.get(applicationId);
+        return serviceSeqNo.incrementAndGet();
     }
 
-    public AtomicLong getPodSeqNo(String applicationId) {
-        if(!podSeqNoMap.containsKey(applicationId)) {
-            podSeqNoMap.put(applicationId, new AtomicLong());
+    public long getNextPodSeqNo() {
+        // reset before we hit the max character length for Kub pod id
+        if (podSeqNo.get() > MAX_POD_ID) {
+            podSeqNo.set(0);
         }
-        return podSeqNoMap.get(applicationId);
+        return podSeqNo.incrementAndGet();
     }
 
     @Override
@@ -192,37 +199,50 @@ public class KubernetesClusterContext implements 
Serializable {
 
     @Override
     public boolean equals(Object obj) {
-        if (this == obj)
+        if (this == obj) {
             return true;
-        if (obj == null)
+        }
+        if (obj == null) {
             return false;
-        if (getClass() != obj.getClass())
+        }
+        if (getClass() != obj.getClass()) {
             return false;
+        }
         KubernetesClusterContext other = (KubernetesClusterContext) obj;
         if (servicePortSequence == null) {
-            if (other.servicePortSequence != null)
+            if (other.servicePortSequence != null) {
                 return false;
-        } else if (!servicePortSequence.equals(other.servicePortSequence))
+            }
+        } else if (!servicePortSequence.equals(other.servicePortSequence)) {
             return false;
+        }
         if (kubernetesClusterId == null) {
-            if (other.kubernetesClusterId != null)
+            if (other.kubernetesClusterId != null) {
                 return false;
-        } else if (!kubernetesClusterId.equals(other.kubernetesClusterId))
+            }
+        } else if (!kubernetesClusterId.equals(other.kubernetesClusterId)) {
             return false;
-        if (lowerPort != other.lowerPort)
+        }
+        if (lowerPort != other.lowerPort) {
             return false;
+        }
         if (masterIp == null) {
-            if (other.masterIp != null)
+            if (other.masterIp != null) {
                 return false;
-        } else if (!masterIp.equals(other.masterIp))
+            }
+        } else if (!masterIp.equals(other.masterIp)) {
             return false;
+        }
         if (masterPort == null) {
-            if (other.masterPort != null)
+            if (other.masterPort != null) {
                 return false;
-        } else if (!masterPort.equals(other.masterPort))
+            }
+        } else if (!masterPort.equals(other.masterPort)) {
             return false;
-        if (upperPort != other.upperPort)
+        }
+        if (upperPort != other.upperPort) {
             return false;
+        }
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d6279d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
index 44d8d1d..5867184 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
@@ -44,10 +44,7 @@ import 
org.apache.stratos.kubernetes.client.KubernetesConstants;
 import 
org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException;
 import org.apache.stratos.messaging.domain.topology.KubernetesService;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -395,7 +392,8 @@ public class KubernetesIaas extends Iaas {
             memory = memoryProperty.getValue();
         }
 
-        IaasProvider iaasProvider = 
CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridge.getType(),
 partition.getId());
+        IaasProvider iaasProvider =
+                
CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridge.getType(),
 partition.getId());
         if (iaasProvider == null) {
             String message = "Could not find iaas provider: [partition] " + 
partition.getId();
             log.error(message);
@@ -406,15 +404,15 @@ public class KubernetesIaas extends Iaas {
         memberContext.setDynamicPayload(payload.toArray(new 
NameValuePair[payload.size()]));
 
         // Find next available sequence number
-        long podSeqNo = 
kubernetesClusterContext.getPodSeqNo(applicationId).incrementAndGet();
-        String podId = preparePodId(applicationId, podSeqNo);
-        while(kubernetesApi.getPod(podId) != null) {
-            podSeqNo = 
kubernetesClusterContext.getPodSeqNo(applicationId).incrementAndGet();
-            podId = preparePodId(applicationId, podSeqNo);
+        long podSeqNo = kubernetesClusterContext.getNextPodSeqNo();
+        String podId = preparePodId(podSeqNo);
+        while (kubernetesApi.getPod(podId) != null) {
+            podSeqNo = kubernetesClusterContext.getNextPodSeqNo();
+            podId = preparePodId(podSeqNo);
         }
 
         // Create pod
-        String podLabel = DigestUtils.md5Hex(clusterId);
+        String podName = DigestUtils.md5Hex(clusterId);
         String dockerImage = iaasProvider.getImage();
         List<EnvVar> environmentVariables = 
KubernetesIaasUtil.prepareEnvironmentVariables(
                 clusterContext, memberContext);
@@ -426,16 +424,23 @@ public class KubernetesIaas extends Iaas {
                 memberContext.getApplicationId(), 
memberContext.getCartridgeType(),
                 memberContext.getMemberId(), cpu, memory));
 
-        kubernetesApi.createPod(podId, podLabel, dockerImage, cpu, memory, 
ports, environmentVariables);
+        Map<String, String> podLabels = new HashMap<>();
+        podLabels.put("applicationId", memberContext.getApplicationId());
+        podLabels.put("clusterId", memberContext.getClusterId());
+        podLabels.put("clusterInstanceId", 
memberContext.getClusterInstanceId());
+        podLabels.put("memberId", memberContext.getMemberId());
+        podLabels.put("cartridgeType", memberContext.getCartridgeType());
+
+        kubernetesApi.createPod(podId, podName, podLabels, dockerImage, cpu, 
memory, ports, environmentVariables);
 
         log.info(String.format("Pod started successfully: [application] %s 
[cartridge] %s [member] %s " +
                         "[pod] %s [pod-label] %s [cpu] %s [memory] %s",
                 memberContext.getApplicationId(), 
memberContext.getCartridgeType(),
-                memberContext.getMemberId(), podId, podLabel, cpu, memory));
+                memberContext.getMemberId(), podId, podName, cpu, memory));
 
         // Add pod id to member context
         memberContext.setKubernetesPodId(podId);
-        memberContext.setKubernetesPodLabel(podLabel);
+        memberContext.setKubernetesPodName(podName);
 
         // Create instance metadata
         InstanceMetadata instanceMetadata = new InstanceMetadata();
@@ -448,8 +453,8 @@ public class KubernetesIaas extends Iaas {
         CloudControllerContext.getInstance().persist();
     }
 
-    private String preparePodId(String applicationId, long podSeqNo) {
-        return applicationId + "-" + POD_ID_PREFIX + "-" + podSeqNo;
+    private String preparePodId(long podSeqNo) {
+        return POD_ID_PREFIX + "-" + podSeqNo;
     }
 
     /**
@@ -463,13 +468,10 @@ public class KubernetesIaas extends Iaas {
      */
     private void createKubernetesServices(KubernetesApiClient kubernetesApi, 
ClusterContext clusterContext,
                                           KubernetesCluster kubernetesCluster,
-                                          KubernetesClusterContext 
kubernetesClusterContext,MemberContext memberContext)
+                                          KubernetesClusterContext 
kubernetesClusterContext)
             throws KubernetesClientException {
-
-        String applicationId = clusterContext.getApplicationId();
         String clusterId = clusterContext.getClusterId();
         String cartridgeType = clusterContext.getCartridgeType();
-
         Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeType);
         if (cartridge == null) {
             String message = "Could not create kubernetes services, cartridge 
not found: [cartridge] " +
@@ -492,62 +494,67 @@ public class KubernetesIaas extends Iaas {
 
         Collection<ClusterPortMapping> clusterPortMappings = 
CloudControllerContext.getInstance()
                 .getClusterPortMappings(clusterContext.getApplicationId(), 
clusterId);
+        Map<String, String> serviceLabels = new HashMap<>();
 
         if (clusterPortMappings != null) {
-            String serviceLabel = DigestUtils.md5Hex(clusterId);
+            String serviceName = DigestUtils.md5Hex(clusterId);
             Collection<KubernetesService> kubernetesServices = 
kubernetesClusterContext.getKubernetesServices();
 
             for (ClusterPortMapping clusterPortMapping : clusterPortMappings) {
                 // Skip if already created
                 int containerPort = clusterPortMapping.getPort();
                 if (kubernetesServiceExist(kubernetesServices, containerPort)) 
{
-                    if(log.isDebugEnabled()) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("Kubernetes service already 
exists: [kubernetes-cluster] %s " +
-                                "[cluster] %s [service-label] %s 
[container-port] %d ",
-                                kubernetesCluster.getClusterId(), clusterId, 
serviceLabel, containerPort));
+                                        "[cluster] %s [service-name] %s 
[container-port] %d ",
+                                kubernetesCluster.getClusterId(), clusterId, 
serviceName, containerPort));
                     }
                     continue;
                 }
 
                 // Find next available service sequence number
-                long serviceSeqNo = 
kubernetesClusterContext.getServiceSeqNo(applicationId).incrementAndGet();
-                String serviceName = KubernetesIaasUtil.fixSpecialCharacters(
-                        prepareServiceName(applicationId, serviceSeqNo));
-
-                while(kubernetesApi.getService(serviceName) != null) {
-                    serviceSeqNo = 
kubernetesClusterContext.getServiceSeqNo(applicationId).incrementAndGet();
-                    serviceName = KubernetesIaasUtil.fixSpecialCharacters(
-                            prepareServiceName(applicationId, serviceSeqNo));
+                long serviceSeqNo = 
kubernetesClusterContext.getNextServiceSeqNo();
+                String serviceId =
+                        
KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo));
+                while (kubernetesApi.getService(serviceId) != null) {
+                    serviceSeqNo = 
kubernetesClusterContext.getNextServiceSeqNo();
+                    serviceId =
+                            
KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo));
                 }
 
                 if (log.isInfoEnabled()) {
-                    log.info(String.format("Creating kubernetes service: 
[cluster] %s [service] %s [service-label] %s " +
-                                    "[protocol] %s [service-port] %d 
[container-port] %s", clusterId,
-                            serviceName, serviceLabel, 
clusterPortMapping.getProtocol(),
-                            clusterPortMapping.getKubernetesServicePort(), 
containerPort));
+                    log.info(
+                            String.format("Creating kubernetes service: 
[cluster] %s [service-id] %s [service-name] " +
+                                            "%s " + "[protocol] %s 
[service-port] %d [container-port] %s", clusterId,
+                                    serviceId, serviceName, 
clusterPortMapping.getProtocol(),
+                                    
clusterPortMapping.getKubernetesServicePort(), containerPort));
                 }
 
                 // Create kubernetes service for port mapping
                 int servicePort = 
clusterPortMapping.getKubernetesServicePort();
                 String serviceType = 
clusterPortMapping.getKubernetesServiceType();
                 String containerPortName = 
KubernetesIaasUtil.preparePortNameFromPortMapping(clusterPortMapping);
-
-                try {
-                    kubernetesApi.createService(serviceName, serviceLabel, 
servicePort, serviceType, containerPortName,
-                                containerPort, sessionAffinity);
-                } finally {
-                    // Persist kubernetes service sequence no
-                    CloudControllerContext.getInstance().persist();
-                }
-
+                serviceLabels.put("applicationId", 
clusterPortMapping.getApplicationId());
+                serviceLabels.put("clusterId", 
clusterPortMapping.getClusterId());
+                serviceLabels.put("name", clusterPortMapping.getName());
+                serviceLabels.put("protocol", 
clusterPortMapping.getProtocol());
+                serviceLabels.put("serviceType", 
clusterPortMapping.getKubernetesServiceType());
+                serviceLabels.put("portType", 
clusterPortMapping.getKubernetesPortType());
+                serviceLabels.put("servicePort", 
String.valueOf(clusterPortMapping.getKubernetesServicePort()));
+                serviceLabels.put("port", 
String.valueOf(clusterPortMapping.getPort()));
+                serviceLabels.put("proxyPort", 
String.valueOf(clusterPortMapping.getProxyPort()));
+
+                kubernetesApi.createService(serviceId, serviceName, 
serviceLabels, servicePort, serviceType,
+                        containerPortName, containerPort, sessionAffinity);
                 try {
                     Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
+                }
+                catch (InterruptedException ignore) {
                 }
 
-                Service service = kubernetesApi.getService(serviceName);
-                if(service == null) {
-                    throw new KubernetesClientException("Kubernetes service 
not found: [service] " + serviceName);
+                Service service = kubernetesApi.getService(serviceId);
+                if (service == null) {
+                    throw new KubernetesClientException("Kubernetes service 
not found: [service-id] " + serviceId);
                 }
 
                 KubernetesService kubernetesService = new KubernetesService();
@@ -561,7 +568,6 @@ public class KubernetesIaas extends Iaas {
 
                 String kubernetesServiceType = service.getSpec().getType();
                 kubernetesService.setServiceType(kubernetesServiceType);
-                
kubernetesService.setKubernetesClusterId(memberContext.getPartition().getKubernetesClusterId());
 
                 if 
(kubernetesServiceType.equals(KubernetesConstants.NODE_PORT)) {
                     
kubernetesService.setPort(service.getSpec().getPorts().get(0).getNodePort());
@@ -576,16 +582,17 @@ public class KubernetesIaas extends Iaas {
                 CloudControllerContext.getInstance().persist();
 
                 if (log.isInfoEnabled()) {
-                    log.info(String.format("Kubernetes service successfully 
created: [cluster] %s [service] %s " +
-                                    "[protocol] %s [node-port] %d 
[container-port] %s", clusterId,
-                            serviceName, clusterPortMapping.getProtocol(), 
servicePort, containerPort));
+                    log.info(String.format(
+                            "Kubernetes service successfully created: 
[cluster] %s [service-id] %s [protocol] %s " +
+                                    "[node-port] %d [container-port] %s", 
clusterId, serviceId,
+                            clusterPortMapping.getProtocol(), servicePort, 
containerPort));
                 }
             }
         }
     }
 
-    private String prepareServiceName(String applicationId, long serviceSeqNo) 
{
-        return applicationId + "-" + SERVICE_NAME_PREFIX + "-" + 
(serviceSeqNo);
+    private String prepareServiceName(long serviceSeqNo) {
+        return SERVICE_NAME_PREFIX + "-" + (serviceSeqNo);
     }
 
     private List<String> prepareMinionIPAddresses(KubernetesCluster 
kubernetesCluster) {
@@ -989,4 +996,4 @@ public class KubernetesIaas extends Iaas {
         }
 
     }
-}
+}
\ No newline at end of file

Reply via email to