Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 81fe08c22 -> 0248d2068
Optimizing kubernetes service creation logic Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0248d206 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0248d206 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0248d206 Branch: refs/heads/stratos-4.1.x Commit: 0248d206848f7aa3984fc3eab5215dce29dab7cc Parents: 81fe08c Author: Imesh Gunaratne <[email protected]> Authored: Thu Sep 10 23:29:11 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Sep 10 23:29:51 2015 +0530 ---------------------------------------------------------------------- .../cloud/controller/domain/ClusterContext.java | 19 +++- .../kubernetes/KubernetesClusterContext.java | 69 +++++++---- .../iaases/kubernetes/KubernetesIaas.java | 114 ++++++++++--------- .../messaging/topology/TopologyBuilder.java | 9 +- 4 files changed, 124 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java index 09b987d..a559a1e 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java @@ -23,7 +23,9 @@ import org.apache.stratos.common.Properties; import org.apache.stratos.messaging.domain.topology.KubernetesService; import java.io.Serializable; -import java.util.List; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * Holds runtime data of a Cluster. @@ -44,12 +46,13 @@ public class ClusterContext implements Serializable { // on an unregistration. private long timeoutInMillis; private Properties properties; - private List<KubernetesService> kubernetesServices; + private Map<String, KubernetesService> kubernetesServices; private String kubernetesClusterId; public ClusterContext(String applicationId, String cartridgeType, String clusterId, String payload, String hostName, boolean isLbCluster, Properties properties) { + this.kubernetesServices = new HashMap<>(); this.applicationId = applicationId; this.cartridgeType = cartridgeType; this.clusterId = clusterId; @@ -115,12 +118,16 @@ public class ClusterContext implements Serializable { this.properties = properties; } - public List<KubernetesService> getKubernetesServices() { - return kubernetesServices; + public Collection<KubernetesService> getKubernetesServices() { + return kubernetesServices.values(); } - public void setKubernetesServices(List<KubernetesService> kubernetesServices) { - this.kubernetesServices = kubernetesServices; + public void addKubernetesService(KubernetesService kubernetesService) { + this.kubernetesServices.put(kubernetesService.getId(), kubernetesService); + } + + public void removeKubernetesService(String serviceName) { + kubernetesServices.remove(serviceName); } public void setKubernetesClusterId(String kubernetesClusterId) { http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java index 38c49a0..7e6d557 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java @@ -21,10 +21,10 @@ package org.apache.stratos.cloud.controller.domain.kubernetes; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.kubernetes.client.KubernetesApiClient; +import org.apache.stratos.messaging.domain.topology.KubernetesService; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicLong; /** @@ -33,28 +33,28 @@ import java.util.concurrent.atomic.AtomicLong; public class KubernetesClusterContext implements Serializable { private static final long serialVersionUID = -802025758806195791L; + private static final Log log = LogFactory.getLog(KubernetesClusterContext.class); - // id of the Kubernetes cluster private String kubernetesClusterId; private int upperPort; private int lowerPort; - // kubernetes master ip private String masterIp; private String masterPort; - // available list of ports - private List<Integer> servicePorts; - // kubernetes client API instance + private List<Integer> servicePortSequence; + private Map<String, KubernetesService> kubernetesServices; private transient KubernetesApiClient kubApi; private AtomicLong serviceSeqNo; private AtomicLong podSeqNo; public KubernetesClusterContext(String id, String masterIp, String masterPort, int lowerPort, int upperPort) { - servicePorts = new ArrayList<Integer>(); + this.servicePortSequence = new ArrayList<>(); + this.kubernetesServices = new HashMap<>(); + this.lowerPort = lowerPort; this.upperPort = upperPort; // Generate the ports - generateServicePorts(lowerPort, upperPort); + initializeServicePortSequence(lowerPort, upperPort); this.kubernetesClusterId = id; this.masterIp = masterIp; this.masterPort = masterPort; @@ -77,33 +77,58 @@ public class KubernetesClusterContext implements Serializable { } public List<Integer> getServicePorts() { - return servicePorts; + return servicePortSequence; } public void setServicePorts(List<Integer> servicePorts) { - this.servicePorts = servicePorts; + this.servicePortSequence = servicePorts; } + /*** + * Get next available service port. + * @return + */ public int getNextServicePort() { - if (servicePorts.isEmpty()) { + if (servicePortSequence.isEmpty()) { return -1; } - return servicePorts.remove(0); + return servicePortSequence.remove(0); } + /** + * Deallocate a service port by adding it again to the sequence. + * @param port + */ public void deallocatePort(int port) { - if (!servicePorts.contains(port)) { - servicePorts.add(port); - // TODO Sort elements + if (!servicePortSequence.contains(port)) { + servicePortSequence.add(port); + Collections.sort(servicePortSequence); } } - private void generateServicePorts(int lowerPort, int upperPort) { + /** + * Initialize service port sequence according to the given port range. + * @param lowerPort + * @param upperPort + */ + private void initializeServicePortSequence(int lowerPort, int upperPort) { for (int port = lowerPort; port <= upperPort; port++) { - servicePorts.add(port); + servicePortSequence.add(port); } } + public void addKubernetesService(KubernetesService service) { + kubernetesServices.put(service.getId(), service); + } + + public void removeKubernetesService(String serviceName) { + kubernetesServices.remove(serviceName); + } + + public Collection<KubernetesService> getKubernetesServices() { + return kubernetesServices.values(); + } + public String getMasterIp() { return masterIp; } @@ -151,7 +176,7 @@ public class KubernetesClusterContext implements Serializable { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((servicePorts == null) ? 0 : servicePorts.hashCode()); + result = prime * result + ((servicePortSequence == null) ? 0 : servicePortSequence.hashCode()); result = prime * result + ((kubernetesClusterId == null) ? 0 : kubernetesClusterId.hashCode()); result = prime * result + lowerPort; result = prime * result + ((masterIp == null) ? 0 : masterIp.hashCode()); @@ -169,10 +194,10 @@ public class KubernetesClusterContext implements Serializable { if (getClass() != obj.getClass()) return false; KubernetesClusterContext other = (KubernetesClusterContext) obj; - if (servicePorts == null) { - if (other.servicePorts != null) + if (servicePortSequence == null) { + if (other.servicePortSequence != null) return false; - } else if (!servicePorts.equals(other.servicePorts)) + } else if (!servicePortSequence.equals(other.servicePortSequence)) return false; if (kubernetesClusterId == null) { if (other.kubernetesClusterId != null) http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java index 7c987b6..9796a8c 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java @@ -19,6 +19,7 @@ package org.apache.stratos.cloud.controller.iaases.kubernetes; +import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.*; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.NotImplementedException; @@ -467,27 +468,10 @@ public class KubernetesIaas extends Iaas { sessionAffinity = sessionAffinityProperty.getValue(); } - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); - if (kubernetesServices == null) { - kubernetesServices = new ArrayList<KubernetesService>(); - } - // Prepare minion public IP addresses - List<String> minionPrivateIPList = new ArrayList<String>(); - List<String> minionPublicIPList = new ArrayList<String>(); - KubernetesHost[] kubernetesHosts = kubernetesCluster.getKubernetesHosts(); - if ((kubernetesHosts == null) || (kubernetesHosts.length == 0) || (kubernetesHosts[0] == null)) { - throw new RuntimeException("Hosts not found in kubernetes cluster: [cluster] " - + kubernetesCluster.getClusterId()); - } - for (KubernetesHost host : kubernetesHosts) { - if (host != null) { - minionPrivateIPList.add(host.getPrivateIPAddress()); - minionPublicIPList.add(host.getPublicIPAddress()); - } - } + List<String> minionPublicIPList = prepareMinionIPAddresses(kubernetesCluster); if (log.isDebugEnabled()) { - log.debug(String.format("Minion private IPs: %s", minionPrivateIPList)); + log.debug(String.format("Minion public IPs: %s", minionPublicIPList)); } Collection<ClusterPortMapping> clusterPortMappings = CloudControllerContext.getInstance() @@ -495,15 +479,12 @@ public class KubernetesIaas extends Iaas { if (clusterPortMappings != null) { String serviceLabel = DigestUtils.md5Hex(clusterId); - if(log.isDebugEnabled()) { - log.debug("Retrieving existing kubernetes services..."); - } - List<Service> services = kubernetesApi.getServices(); + Collection<KubernetesService> kubernetesServices = kubernetesClusterContext.getKubernetesServices(); for (ClusterPortMapping clusterPortMapping : clusterPortMappings) { // Skip if already created int containerPort = clusterPortMapping.getPort(); - if (kubernetesServiceExist(services, serviceLabel, containerPort)) { + if (kubernetesServiceExist(kubernetesServices, containerPort)) { if(log.isDebugEnabled()) { log.debug(String.format("Kubernetes service already exists: [kubernetes-cluster] %s " + "[cluster] %s [service-label] %s [container-port] %d ", @@ -514,12 +495,12 @@ public class KubernetesIaas extends Iaas { // Find next service sequence no long serviceSeqNo = kubernetesClusterContext.getServiceSeqNo().incrementAndGet(); - String serviceId = KubernetesIaasUtil.fixSpecialCharacters("service" + "-" + (serviceSeqNo)); + String serviceName = KubernetesIaasUtil.fixSpecialCharacters("service" + "-" + (serviceSeqNo)); if (log.isInfoEnabled()) { log.info(String.format("Creating kubernetes service: [cluster] %s [service] %s [service-label] %s " + "[protocol] %s [service-port] %d [container-port] %s", clusterId, - serviceId, serviceLabel, clusterPortMapping.getProtocol(), + serviceName, serviceLabel, clusterPortMapping.getProtocol(), clusterPortMapping.getKubernetesServicePort(), containerPort)); } @@ -530,15 +511,15 @@ public class KubernetesIaas extends Iaas { try { // If kubernetes service is already created, skip creating a new one - if (kubernetesApi.getService(serviceId) == null) { + if (kubernetesApi.getService(serviceName) == null) { // Services need to use minions private IP addresses for creating iptable rules - kubernetesApi.createService(serviceId, serviceLabel, servicePort, serviceType, containerPortName, + kubernetesApi.createService(serviceName, serviceLabel, servicePort, serviceType, containerPortName, containerPort, sessionAffinity); } else { if (log.isDebugEnabled()) { log.debug(String.format("Kubernetes service is already created: [cluster] %s [service] %s " + "[protocol] %s [service-port] %d [container-port] %d", clusterId, - serviceId, clusterPortMapping.getProtocol(), servicePort, containerPort)); + serviceName, clusterPortMapping.getProtocol(), servicePort, containerPort)); } } } finally { @@ -551,7 +532,10 @@ public class KubernetesIaas extends Iaas { } catch (InterruptedException ignore) { } - Service service = kubernetesApi.getService(serviceId); + Service service = kubernetesApi.getService(serviceName); + if(service == null) { + throw new KubernetesClientException("Kubernetes service not found: [service] " + serviceName); + } KubernetesService kubernetesService = new KubernetesService(); kubernetesService.setId(service.getMetadata().getName()); @@ -572,37 +556,45 @@ public class KubernetesIaas extends Iaas { } kubernetesService.setContainerPort(containerPort); - kubernetesServices.add(kubernetesService); + + kubernetesClusterContext.addKubernetesService(kubernetesService); + clusterContext.addKubernetesService(kubernetesService); + CloudControllerContext.getInstance().persist(); if (log.isInfoEnabled()) { log.info(String.format("Kubernetes service successfully created: [cluster] %s [service] %s " + "[protocol] %s [node-port] %d [container-port] %s", clusterId, - serviceId, clusterPortMapping.getProtocol(), servicePort, containerPort)); + serviceName, clusterPortMapping.getProtocol(), servicePort, containerPort)); } } } + } - // Add kubernetes services to cluster context and persist - clusterContext.setKubernetesServices(kubernetesServices); - CloudControllerContext.getInstance().persist(); + private List<String> prepareMinionIPAddresses(KubernetesCluster kubernetesCluster) { + List<String> minionPublicIPList = new ArrayList<String>(); + KubernetesHost[] kubernetesHosts = kubernetesCluster.getKubernetesHosts(); + if ((kubernetesHosts == null) || (kubernetesHosts.length == 0) || (kubernetesHosts[0] == null)) { + throw new RuntimeException("Hosts not found in kubernetes cluster: [cluster] " + + kubernetesCluster.getClusterId()); + } + for (KubernetesHost host : kubernetesHosts) { + if (host != null) { + minionPublicIPList.add(host.getPublicIPAddress()); + } + } + return minionPublicIPList; } /** * Returns true if a kubernetes service exists with the given container port * @param services - * @param serviceLabel * @param containerPort * @return */ - private boolean kubernetesServiceExist(List<Service> services, String serviceLabel, int containerPort) { - for(Service service : services) { - Map<String, String> labels = service.getMetadata().getLabels(); - if((labels != null) && (labels.get(KubernetesConstants.LABEL_NAME).equals(serviceLabel))) { - for (ServicePort port : service.getSpec().getPorts()) { - if (port.getPort() == containerPort) { - return true; - } - } + private boolean kubernetesServiceExist(Collection<KubernetesService> services, int containerPort) { + for(KubernetesService service : services) { + if (service.getContainerPort() == containerPort) { + return true; } } return false; @@ -740,13 +732,16 @@ public class KubernetesIaas extends Iaas { KubernetesApiClient kubApi = kubClusterContext.getKubApi(); // Remove kubernetes services - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + List<KubernetesService> kubernetesServices = Lists.newArrayList(clusterContext.getKubernetesServices()); if (kubernetesServices != null) { for (KubernetesService kubernetesService : kubernetesServices) { try { - kubApi.deleteService(kubernetesService.getId()); - int allocatedPort = kubernetesService.getPort(); - kubClusterContext.deallocatePort(allocatedPort); + String serviceId = kubernetesService.getId(); + kubApi.deleteService(serviceId); + + kubClusterContext.deallocatePort(kubernetesService.getPort()); + kubClusterContext.removeKubernetesService(serviceId); + clusterContext.removeKubernetesService(serviceId); } catch (KubernetesClientException e) { log.error("Could not remove kubernetes service: [cluster-id] " + clusterId, e); } @@ -934,23 +929,32 @@ public class KubernetesIaas extends Iaas { */ public static void removeKubernetesServices(String applicationId, String clusterId) { - ClusterContext clusterContext = - CloudControllerContext.getInstance().getClusterContext(clusterId); + ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + if (clusterContext != null) { String kubernetesClusterId = clusterContext.getKubernetesClusterId(); + if (org.apache.commons.lang3.StringUtils.isNotBlank(kubernetesClusterId)) { KubernetesClusterContext kubernetesClusterContext = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId); + if (kubernetesClusterContext != null) { KubernetesApiClient kubernetesApiClient = kubernetesClusterContext.getKubApi(); - for (KubernetesService kubernetesService : clusterContext.getKubernetesServices()) { + ArrayList<KubernetesService> kubernetesServices = Lists.newArrayList(clusterContext.getKubernetesServices()); + + for (KubernetesService kubernetesService : kubernetesServices) { + String serviceId = kubernetesService.getId(); log.info(String.format("Deleting kubernetes service: [application-id] %s " + - "[service-id] %s", applicationId, kubernetesService.getId())); + "[service-id] %s", applicationId, serviceId)); + try { - kubernetesApiClient.deleteService(kubernetesService.getId()); + kubernetesApiClient.deleteService(serviceId); + kubernetesClusterContext.deallocatePort(kubernetesService.getPort()); + kubernetesClusterContext.removeKubernetesService(serviceId); + clusterContext.removeKubernetesService(serviceId); } catch (KubernetesClientException e) { log.error(String.format("Could not delete kubernetes service: [application-id] %s " + - "[service-id] %s", applicationId, kubernetesService.getId())); + "[service-id] %s", applicationId, serviceId)); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index ecd2728..26d1bbd 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.stratos.cloud.controller.messaging.topology; +import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -455,7 +456,7 @@ public class TopologyBuilder { Cluster cluster = service.getCluster(memberContext.getClusterId()); String clusterId = cluster.getClusterId(); ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + List<KubernetesService> kubernetesServices = Lists.newArrayList(clusterContext.getKubernetesServices()); if (kubernetesServices != null) { cluster.setKubernetesServices(kubernetesServices); @@ -481,7 +482,7 @@ public class TopologyBuilder { } } - private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices, + private static int findKubernetesServicePort(String clusterId, Collection<KubernetesService> kubernetesServices, PortMapping portMapping) { for (KubernetesService kubernetesService : kubernetesServices) { if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { @@ -606,7 +607,7 @@ public class TopologyBuilder { List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); String clusterId = cluster.getClusterId(); ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + Collection<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); for (PortMapping portMapping : portMappings) { if (kubernetesServices != null) { @@ -853,7 +854,7 @@ public class TopologyBuilder { clusterStatusClusterActivatedEvent.getInstanceId()); try { TopologyManager.acquireWriteLock(); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + Collection<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); if (kubernetesServices != null) {
