on start containers method, do a label query and find the Pods being created 
and creating corresponding MemberContexts.


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/55150013
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/55150013
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/55150013

Branch: refs/heads/container-autoscaling
Commit: 55150013c6122d6217a5e1235cd1a8ae8f5baf64
Parents: 1dddf29
Author: Nirmal Fernando <[email protected]>
Authored: Tue Oct 7 18:15:22 2014 +0530
Committer: Nirmal Fernando <[email protected]>
Committed: Wed Oct 8 22:21:29 2014 +0530

----------------------------------------------------------------------
 .../impl/CloudControllerServiceImpl.java        | 166 ++++++++++---------
 1 file changed, 87 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/55150013/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
index 18269e6..a413882 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
@@ -28,8 +28,9 @@ import 
org.apache.stratos.cloud.controller.concurrent.PartitionValidatorCallable
 import org.apache.stratos.cloud.controller.concurrent.ThreadExecutor;
 import org.apache.stratos.cloud.controller.deployment.partition.Partition;
 import org.apache.stratos.cloud.controller.exception.*;
-import 
org.apache.stratos.cloud.controller.functions.MemberContextToKubernetesService;
-import 
org.apache.stratos.cloud.controller.functions.MemberContextToReplicationController;
+import 
org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKubernetesService;
+import 
org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController;
+import org.apache.stratos.cloud.controller.functions.PodToMemberContext;
 import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
 import org.apache.stratos.cloud.controller.interfaces.Iaas;
 import org.apache.stratos.cloud.controller.persist.Deserializer;
@@ -45,6 +46,8 @@ import 
org.apache.stratos.cloud.controller.validate.interfaces.PartitionValidato
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.kubernetes.client.KubernetesApiClient;
 import 
org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException;
+import org.apache.stratos.kubernetes.client.model.Label;
+import org.apache.stratos.kubernetes.client.model.Pod;
 import org.apache.stratos.kubernetes.client.model.ReplicationController;
 import org.apache.stratos.kubernetes.client.model.Service;
 import org.apache.stratos.messaging.domain.topology.Member;
@@ -1330,33 +1333,32 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     }
 
        @Override
-       public MemberContext startContainers(MemberContext memberContext)
+       public MemberContext[] startContainers(ContainerClusterContext 
containerClusterContext)
                        throws UnregisteredCartridgeException {
                
                if(log.isDebugEnabled()) {
-               log.debug("CloudControllerServiceImpl:startContainer");
+               log.debug("CloudControllerServiceImpl:startContainers");
        }
 
-        if (memberContext == null) {
-            String msg = "Instance start-up failed. Member is null.";
+        if (containerClusterContext == null) {
+            String msg = "Instance start-up failed. ContainerClusterContext is 
null.";
             log.error(msg);
             throw new IllegalArgumentException(msg);
         }
 
-        String clusterId = memberContext.getClusterId();
+        String clusterId = containerClusterContext.getClusterId();
         if(log.isDebugEnabled()) {
-               log.debug("Received an instance spawn request : " + 
memberContext.toString());
+               log.debug("Received an instance spawn request : " + 
containerClusterContext.toString());
         }
 
         ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
 
         if (ctxt == null) {
-            String msg = "Instance start-up failed. Invalid cluster id. " + 
memberContext.toString();
+            String msg = "Instance start-up failed. Invalid cluster id. " + 
containerClusterContext.toString();
             log.error(msg);
             throw new IllegalArgumentException(msg);
         }
         
-        
         String cartridgeType = ctxt.getCartridgeType();
 
         Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
@@ -1364,59 +1366,28 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
         if (cartridge == null) {
             String msg =
                          "Instance start-up failed. No matching Cartridge 
found [type] "+cartridgeType +". "+
-                                 memberContext.toString();
+                                 containerClusterContext.toString();
             log.error(msg);
             throw new UnregisteredCartridgeException(msg);
         }
 
-        memberContext.setCartridgeType(cartridgeType);
-
         try {
-            // generating the Unique member ID...
-            String memberID = generateMemberId(clusterId);
-            memberContext.setMemberId(memberID);
-
-                       String kubernetesClusterId = 
CloudControllerUtil.getProperty(ctxt.getProperties(), 
-                                       StratosConstants.KUBERNETES_CLUSTER_ID);
-                       
-                       if (kubernetesClusterId == null) {
-                               String msg = "Instance start-up failed. Cannot 
find '"+
-                                               
StratosConstants.KUBERNETES_CLUSTER_ID+"'. " + ctxt;
-                               log.error(msg);
-                               throw new IllegalArgumentException(msg);
-                       }
-                       
-                       String kubernetesMasterIp = 
CloudControllerUtil.getProperty(memberContext.getProperties(), 
-                                       StratosConstants.KUBERNETES_MASTER_IP);
-                       
-                       if (kubernetesMasterIp == null) {
-                               String msg = "Instance start-up failed. Cannot 
find '"+
-                                               
StratosConstants.KUBERNETES_MASTER_IP+"'. " + memberContext;
-                               log.error(msg);
-                               throw new IllegalArgumentException(msg);
-                       }
-                       
-                       String kubernetesPortRange = 
CloudControllerUtil.getProperty(memberContext.getProperties(), 
-                                       StratosConstants.KUBERNETES_PORT_RANGE);
-                       
-                       if (kubernetesPortRange == null) {
-                               String msg = "Instance start-up failed. Cannot 
find '"+
-                                               
StratosConstants.KUBERNETES_PORT_RANGE+"'. " + memberContext;
-                               log.error(msg);
-                               throw new IllegalArgumentException(msg);
-                       }
+            validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt);
+            String kubernetesClusterId = 
validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
+            String kubernetesMasterIp = 
validateProperty(StratosConstants.KUBERNETES_MASTER_IP, 
containerClusterContext);
+            String kubernetesPortRange = 
validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, 
containerClusterContext);
                        
                        KubernetesClusterContext kubClusterContext = 
getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, 
kubernetesPortRange);
                        
                        KubernetesApiClient kubApi = 
kubClusterContext.getKubApi();
                        
                        // first let's create a replication controller.
-                       MemberContextToReplicationController controllerFunction 
= new MemberContextToReplicationController();
-                       ReplicationController controller = 
controllerFunction.apply(memberContext);
+                       ContainerClusterContextToReplicationController 
controllerFunction = new ContainerClusterContextToReplicationController();
+                       ReplicationController controller = 
controllerFunction.apply(containerClusterContext);
                        
                        if (log.isDebugEnabled()) {
                                log.debug("Cloud Controller is delegating 
request to start a replication controller "+controller+
-                                               " for "+ memberContext + " to 
Kubernetes layer.");
+                                               " for "+ 
containerClusterContext + " to Kubernetes layer.");
                        }
                        
                        kubApi.createReplicationController(controller);
@@ -1427,12 +1398,12 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                        }
                        
                        // secondly let's create a kubernetes service proxy to 
load balance these containers
-                       MemberContextToKubernetesService serviceFunction = new 
MemberContextToKubernetesService();
-                       Service service = serviceFunction.apply(memberContext);
+                       ContainerClusterContextToKubernetesService 
serviceFunction = new ContainerClusterContextToKubernetesService();
+                       Service service = 
serviceFunction.apply(containerClusterContext);
                        
                        if (log.isDebugEnabled()) {
                                log.debug("Cloud Controller is delegating 
request to start a service "+service+
-                                               " for "+ memberContext + " to 
Kubernetes layer.");
+                                               " for "+ 
containerClusterContext + " to Kubernetes layer.");
                        }
                        
                        kubApi.createService(service);
@@ -1442,44 +1413,81 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                                                + controller + " via Kubernetes 
layer.");
                        }
                        
-            memberContext.setPublicIpAddress(kubernetesMasterIp);
-            memberContext.setPrivateIpAddress(kubernetesMasterIp);
-            
memberContext.setProperties(CloudControllerUtil.addProperty(memberContext
-                    .getProperties(), 
StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
-                    CloudControllerUtil.getProperty(ctxt.getProperties(),
-                            StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
-            dataHolder.addMemberContext(memberContext);
-
+                       // create a label query
+                       Label l = new Label();
+                       l.setName(clusterId);
+                       // execute the label query
+                       Pod[] newlyCreatedPods = kubApi.getSelectedPods(new 
Label[]{l});
+                       List<MemberContext> memberContexts = new 
ArrayList<MemberContext>();
+                       
+                       PodToMemberContext podToMemberContextFunc = new 
PodToMemberContext();
+                       // generate Member Contexts
+                       for (Pod pod : newlyCreatedPods) {
+                MemberContext context = podToMemberContextFunc.apply(pod);
+                context.setCartridgeType(cartridgeType);
+                context.setClusterId(clusterId);
+                
+                
context.setProperties(CloudControllerUtil.addProperty(containerClusterContext
+                        .getProperties(), 
StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
+                        CloudControllerUtil.getProperty(ctxt.getProperties(),
+                                
StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
+                dataHolder.addMemberContext(context);
+                
+                // trigger topology
+                // update the topology with the newly spawned member
+                TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, 
null,
+                        kubernetesMasterIp, kubernetesMasterIp, context);
+                // publish data
+                // TODO
+//                
CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, 
context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node);
+                
+                memberContexts.add(context);
+            }
+                       
                        // persist in registry
                        persist();
 
-                       // trigger topology
-                       // update the topology with the newly spawned member
-                       TopologyBuilder.handleMemberSpawned(cartridgeType, 
clusterId, null,
-                                       kubernetesMasterIp, kubernetesMasterIp, 
memberContext);
-
-                       // publish data
-                       // TODO
-                       // CartridgeInstanceDataPublisher.publish(memberID,
-                       // memberContext.getPartition().getId(),
-                       // memberContext.getNetworkPartitionId(),
-                       // memberContext.getClusterId(),
-                       // cartridgeType,
-                       // MemberStatus.Created.toString(),
-                       // node);
+            log.info("Kubernetes entities are successfully starting up. 
"+containerClusterContext.toString());
 
-            log.info("Kubernetes entities are successfully starting up. 
"+memberContext.toString());
-
-            return memberContext;
+            return memberContexts.toArray(new MemberContext[0]);
 
         } catch (Exception e) {
-            String msg = "Failed to start an instance. " + 
memberContext.toString()+" Cause: "+e.getMessage();
+            String msg = "Failed to start an instance. " + 
containerClusterContext.toString()+" Cause: "+e.getMessage();
             log.error(msg, e);
             throw new IllegalStateException(msg, e);
         }
        }
 
-       private KubernetesClusterContext getKubernetesClusterContext(
+       private String validateProperty(String property, ClusterContext ctxt) {
+
+           String propVal = 
CloudControllerUtil.getProperty(ctxt.getProperties(), property);
+        
+        if (propVal == null) {
+            String msg = "Instance start-up failed. Cannot find '"+
+                    StratosConstants.KUBERNETES_MIN_REPLICAS+"' in " + ctxt;
+            log.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+        
+        return propVal;
+    }
+       
+       private String validateProperty(String property, 
ContainerClusterContext ctxt) {
+
+        String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), 
property);
+        
+        if (propVal == null) {
+            String msg = "Instance start-up failed. Cannot find '"+
+                    StratosConstants.KUBERNETES_MIN_REPLICAS+"' in " + ctxt;
+            log.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+        
+        return propVal;
+        
+    }
+
+    private KubernetesClusterContext getKubernetesClusterContext(
                        String kubernetesClusterId, String kubernetesMasterIp,
                        String kubernetesPortRange) {
                

Reply via email to