on start containers method, do a label query and find the Pods being created and creating corresponding MemberContexts.
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/55150013 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/55150013 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/55150013 Branch: refs/heads/container-autoscaling Commit: 55150013c6122d6217a5e1235cd1a8ae8f5baf64 Parents: 1dddf29 Author: Nirmal Fernando <[email protected]> Authored: Tue Oct 7 18:15:22 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Wed Oct 8 22:21:29 2014 +0530 ---------------------------------------------------------------------- .../impl/CloudControllerServiceImpl.java | 166 ++++++++++--------- 1 file changed, 87 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/55150013/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java index 18269e6..a413882 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java @@ -28,8 +28,9 @@ import org.apache.stratos.cloud.controller.concurrent.PartitionValidatorCallable import org.apache.stratos.cloud.controller.concurrent.ThreadExecutor; import org.apache.stratos.cloud.controller.deployment.partition.Partition; import org.apache.stratos.cloud.controller.exception.*; -import org.apache.stratos.cloud.controller.functions.MemberContextToKubernetesService; -import org.apache.stratos.cloud.controller.functions.MemberContextToReplicationController; +import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKubernetesService; +import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController; +import org.apache.stratos.cloud.controller.functions.PodToMemberContext; import org.apache.stratos.cloud.controller.interfaces.CloudControllerService; import org.apache.stratos.cloud.controller.interfaces.Iaas; import org.apache.stratos.cloud.controller.persist.Deserializer; @@ -45,6 +46,8 @@ import org.apache.stratos.cloud.controller.validate.interfaces.PartitionValidato import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.kubernetes.client.KubernetesApiClient; import org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException; +import org.apache.stratos.kubernetes.client.model.Label; +import org.apache.stratos.kubernetes.client.model.Pod; import org.apache.stratos.kubernetes.client.model.ReplicationController; import org.apache.stratos.kubernetes.client.model.Service; import org.apache.stratos.messaging.domain.topology.Member; @@ -1330,33 +1333,32 @@ public class CloudControllerServiceImpl implements CloudControllerService { } @Override - public MemberContext startContainers(MemberContext memberContext) + public MemberContext[] startContainers(ContainerClusterContext containerClusterContext) throws UnregisteredCartridgeException { if(log.isDebugEnabled()) { - log.debug("CloudControllerServiceImpl:startContainer"); + log.debug("CloudControllerServiceImpl:startContainers"); } - if (memberContext == null) { - String msg = "Instance start-up failed. Member is null."; + if (containerClusterContext == null) { + String msg = "Instance start-up failed. ContainerClusterContext is null."; log.error(msg); throw new IllegalArgumentException(msg); } - String clusterId = memberContext.getClusterId(); + String clusterId = containerClusterContext.getClusterId(); if(log.isDebugEnabled()) { - log.debug("Received an instance spawn request : " + memberContext.toString()); + log.debug("Received an instance spawn request : " + containerClusterContext.toString()); } ClusterContext ctxt = dataHolder.getClusterContext(clusterId); if (ctxt == null) { - String msg = "Instance start-up failed. Invalid cluster id. " + memberContext.toString(); + String msg = "Instance start-up failed. Invalid cluster id. " + containerClusterContext.toString(); log.error(msg); throw new IllegalArgumentException(msg); } - String cartridgeType = ctxt.getCartridgeType(); Cartridge cartridge = dataHolder.getCartridge(cartridgeType); @@ -1364,59 +1366,28 @@ public class CloudControllerServiceImpl implements CloudControllerService { if (cartridge == null) { String msg = "Instance start-up failed. No matching Cartridge found [type] "+cartridgeType +". "+ - memberContext.toString(); + containerClusterContext.toString(); log.error(msg); throw new UnregisteredCartridgeException(msg); } - memberContext.setCartridgeType(cartridgeType); - try { - // generating the Unique member ID... - String memberID = generateMemberId(clusterId); - memberContext.setMemberId(memberID); - - String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(), - StratosConstants.KUBERNETES_CLUSTER_ID); - - if (kubernetesClusterId == null) { - String msg = "Instance start-up failed. Cannot find '"+ - StratosConstants.KUBERNETES_CLUSTER_ID+"'. " + ctxt; - log.error(msg); - throw new IllegalArgumentException(msg); - } - - String kubernetesMasterIp = CloudControllerUtil.getProperty(memberContext.getProperties(), - StratosConstants.KUBERNETES_MASTER_IP); - - if (kubernetesMasterIp == null) { - String msg = "Instance start-up failed. Cannot find '"+ - StratosConstants.KUBERNETES_MASTER_IP+"'. " + memberContext; - log.error(msg); - throw new IllegalArgumentException(msg); - } - - String kubernetesPortRange = CloudControllerUtil.getProperty(memberContext.getProperties(), - StratosConstants.KUBERNETES_PORT_RANGE); - - if (kubernetesPortRange == null) { - String msg = "Instance start-up failed. Cannot find '"+ - StratosConstants.KUBERNETES_PORT_RANGE+"'. " + memberContext; - log.error(msg); - throw new IllegalArgumentException(msg); - } + validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt); + String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); + String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext); + String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext); KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, kubernetesPortRange); KubernetesApiClient kubApi = kubClusterContext.getKubApi(); // first let's create a replication controller. - MemberContextToReplicationController controllerFunction = new MemberContextToReplicationController(); - ReplicationController controller = controllerFunction.apply(memberContext); + ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController(); + ReplicationController controller = controllerFunction.apply(containerClusterContext); if (log.isDebugEnabled()) { log.debug("Cloud Controller is delegating request to start a replication controller "+controller+ - " for "+ memberContext + " to Kubernetes layer."); + " for "+ containerClusterContext + " to Kubernetes layer."); } kubApi.createReplicationController(controller); @@ -1427,12 +1398,12 @@ public class CloudControllerServiceImpl implements CloudControllerService { } // secondly let's create a kubernetes service proxy to load balance these containers - MemberContextToKubernetesService serviceFunction = new MemberContextToKubernetesService(); - Service service = serviceFunction.apply(memberContext); + ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService(); + Service service = serviceFunction.apply(containerClusterContext); if (log.isDebugEnabled()) { log.debug("Cloud Controller is delegating request to start a service "+service+ - " for "+ memberContext + " to Kubernetes layer."); + " for "+ containerClusterContext + " to Kubernetes layer."); } kubApi.createService(service); @@ -1442,44 +1413,81 @@ public class CloudControllerServiceImpl implements CloudControllerService { + controller + " via Kubernetes layer."); } - memberContext.setPublicIpAddress(kubernetesMasterIp); - memberContext.setPrivateIpAddress(kubernetesMasterIp); - memberContext.setProperties(CloudControllerUtil.addProperty(memberContext - .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, - CloudControllerUtil.getProperty(ctxt.getProperties(), - StratosConstants.ALLOCATED_SERVICE_HOST_PORT))); - dataHolder.addMemberContext(memberContext); - + // create a label query + Label l = new Label(); + l.setName(clusterId); + // execute the label query + Pod[] newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l}); + List<MemberContext> memberContexts = new ArrayList<MemberContext>(); + + PodToMemberContext podToMemberContextFunc = new PodToMemberContext(); + // generate Member Contexts + for (Pod pod : newlyCreatedPods) { + MemberContext context = podToMemberContextFunc.apply(pod); + context.setCartridgeType(cartridgeType); + context.setClusterId(clusterId); + + context.setProperties(CloudControllerUtil.addProperty(containerClusterContext + .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, + CloudControllerUtil.getProperty(ctxt.getProperties(), + StratosConstants.ALLOCATED_SERVICE_HOST_PORT))); + dataHolder.addMemberContext(context); + + // trigger topology + // update the topology with the newly spawned member + TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, null, + kubernetesMasterIp, kubernetesMasterIp, context); + // publish data + // TODO +// CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node); + + memberContexts.add(context); + } + // persist in registry persist(); - // trigger topology - // update the topology with the newly spawned member - TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, null, - kubernetesMasterIp, kubernetesMasterIp, memberContext); - - // publish data - // TODO - // CartridgeInstanceDataPublisher.publish(memberID, - // memberContext.getPartition().getId(), - // memberContext.getNetworkPartitionId(), - // memberContext.getClusterId(), - // cartridgeType, - // MemberStatus.Created.toString(), - // node); + log.info("Kubernetes entities are successfully starting up. "+containerClusterContext.toString()); - log.info("Kubernetes entities are successfully starting up. "+memberContext.toString()); - - return memberContext; + return memberContexts.toArray(new MemberContext[0]); } catch (Exception e) { - String msg = "Failed to start an instance. " + memberContext.toString()+" Cause: "+e.getMessage(); + String msg = "Failed to start an instance. " + containerClusterContext.toString()+" Cause: "+e.getMessage(); log.error(msg, e); throw new IllegalStateException(msg, e); } } - private KubernetesClusterContext getKubernetesClusterContext( + private String validateProperty(String property, ClusterContext ctxt) { + + String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), property); + + if (propVal == null) { + String msg = "Instance start-up failed. Cannot find '"+ + StratosConstants.KUBERNETES_MIN_REPLICAS+"' in " + ctxt; + log.error(msg); + throw new IllegalArgumentException(msg); + } + + return propVal; + } + + private String validateProperty(String property, ContainerClusterContext ctxt) { + + String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), property); + + if (propVal == null) { + String msg = "Instance start-up failed. Cannot find '"+ + StratosConstants.KUBERNETES_MIN_REPLICAS+"' in " + ctxt; + log.error(msg); + throw new IllegalArgumentException(msg); + } + + return propVal; + + } + + private KubernetesClusterContext getKubernetesClusterContext( String kubernetesClusterId, String kubernetesMasterIp, String kubernetesPortRange) {
