http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index dab6827..f76c928 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -31,6 +31,7 @@ import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.common.Property; +import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.application.ClusterDataHolder; import org.apache.stratos.messaging.domain.instance.ClusterInstance; import org.apache.stratos.messaging.domain.topology.*; @@ -67,8 +68,11 @@ public class TopologyBuilder { TopologyManager.acquireWriteLock(); for (Cartridge cartridge : cartridgeList) { if (!topology.serviceExists(cartridge.getType())) { - ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant; + + ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : + ServiceType.SingleTenant; service = new Service(cartridge.getType(), serviceType, cartridge.getUuid()); + Properties properties = new Properties(); try { @@ -199,867 +203,899 @@ public class TopologyBuilder { } log.debug("Creating cluster port mappings: [application-id] " + appUuid); - for(Cluster cluster : appClusters) { + for (Cluster cluster : appClusters) { String cartridgeUuid = cluster.getServiceUuid(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeUuid); - if(cartridge == null) { + if (cartridge == null) { throw new CloudControllerException("Cartridge not found: [cartridge-uuid] " + cartridgeUuid); } - for(PortMapping portMapping : cartridge.getPortMappings()) { + for (PortMapping portMapping : cartridge.getPortMappings()) { ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appUuid, cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(), - portMapping.getProxyPort()); - CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping); - log.debug("Cluster port mapping created: " + clusterPortMapping.toString()); - } - } - - // Persist cluster port mappings - CloudControllerContext.getInstance().persist(); - - // Send application clusters created event - TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters); - } - - public static void handleApplicationClustersRemoved(String appId, - Set<ClusterDataHolder> clusterData) { - TopologyManager.acquireWriteLock(); - - List<Cluster> removedClusters = new ArrayList<Cluster>(); - CloudControllerContext context = CloudControllerContext.getInstance(); - try { - Topology topology = TopologyManager.getTopology(); - - if (clusterData != null) { - // remove clusters from CC topology model and remove runtime information - for (ClusterDataHolder aClusterData : clusterData) { - Service aService = topology.getService(aClusterData.getServiceUuid()); - if (aService != null) { - removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); - } else { - log.warn("Service " + aClusterData.getServiceType() + " not found, " + - "unable to remove Cluster " + aClusterData.getClusterId()); + portMapping.getProxyPort()); + CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping); + log.debug("Cluster port mapping created: " + clusterPortMapping.toString()); } - // remove runtime data - context.removeClusterContext(aClusterData.getClusterId()); - - log.info("Removed application [ " + appId + " ]'s Cluster " + - "[ " + aClusterData.getClusterId() + " ] from the topology"); } - // persist runtime data changes - CloudControllerContext.getInstance().persist(); - } else { - log.info("No cluster data found for application " + appId + " to remove"); - } - TopologyManager.updateTopology(topology); + // Persist cluster port mappings + CloudControllerContext.getInstance().persist(); - } finally { - TopologyManager.releaseWriteLock(); - } + // Send application clusters created event + TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters); + } - // Remove cluster port mappings of application - CloudControllerContext.getInstance().removeClusterPortMappings(appId); - CloudControllerContext.getInstance().persist(); + public static void handleApplicationClustersRemoved (String appId, + Set < ClusterDataHolder > clusterData){ + TopologyManager.acquireWriteLock(); - TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); + List<Cluster> removedClusters = new ArrayList<Cluster>(); + CloudControllerContext context = CloudControllerContext.getInstance(); + try { + Topology topology = TopologyManager.getTopology(); + + if (clusterData != null) { + // remove clusters from CC topology model and remove runtime information + for (ClusterDataHolder aClusterData : clusterData) { + Service aService = topology.getService(aClusterData.getServiceUuid()); + if (aService != null) { + removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); + } else { + log.warn("Service " + aClusterData.getServiceType() + " not found, " + + "unable to remove Cluster " + aClusterData.getClusterId()); + } + // remove runtime data + context.removeClusterContext(aClusterData.getClusterId()); - } + log.info("Removed application [ " + appId + " ]'s Cluster " + + "[ " + aClusterData.getClusterId() + " ] from the topology"); + } + // persist runtime data changes + CloudControllerContext.getInstance().persist(); + } else { + log.info("No cluster data found for application " + appId + " to remove"); + } - public static void handleClusterReset(ClusterStatusClusterResetEvent event) { - TopologyManager.acquireWriteLock(); + TopologyManager.updateTopology(topology); - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - if (service == null) { - log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); - return; + } finally { + TopologyManager.releaseWriteLock(); } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + - "status to Created"); - return; - } + // Remove cluster port mappings of application + CloudControllerContext.getInstance().removeClusterPortMappings(appId); + CloudControllerContext.getInstance().persist(); - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Created; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Created adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(), - event.getClusterId(), event.getInstanceId()); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); - } + TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); - } finally { - TopologyManager.releaseWriteLock(); } + public static void handleClusterReset (ClusterStatusClusterResetEvent event){ + TopologyManager.acquireWriteLock(); - } - - public static void handleClusterInstanceCreated(String serviceUuid, String clusterId, - String alias, String instanceId, String partitionId, - String networkPartitionUuid) { - - TopologyManager.acquireWriteLock(); + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); + if (service == null) { + log.error("Service " + event.getServiceName() + + " not found in Topology, unable to update the cluster status to Created"); + return; + } - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceUuid); - if (service == null) { - log.error("Service " + serviceUuid + - " not found in Topology, unable to update the cluster status to Created"); - return; - } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + + "status to Created"); + return; + } - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.error("Cluster " + clusterId + " not found in Topology, unable to update " + - "status to Created"); - return; - } + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Created; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Created adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(), + event.getClusterId(), event.getInstanceId()); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); + } - if (cluster.getInstanceContexts(instanceId) != null) { - log.warn("The Instance context for the cluster already exists for [cluster] " + - clusterId + " [instance-id] " + instanceId); - return; + } finally { + TopologyManager.releaseWriteLock(); } - ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId); - clusterInstance.setNetworkPartitionUuid(networkPartitionUuid); - clusterInstance.setPartitionId(partitionId); - cluster.addInstanceContext(instanceId, clusterInstance); - TopologyManager.updateTopology(topology); - - ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - new ClusterInstanceCreatedEvent(serviceUuid, clusterId, - clusterInstance); - clusterInstanceCreatedEvent.setPartitionId(partitionId); - TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); - - } finally { - TopologyManager.releaseWriteLock(); - } - } - - public static void handleClusterRemoved(ClusterContext ctxt) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(ctxt.getCartridgeUuid()); - String deploymentPolicy; - if (service == null) { - log.warn(String.format("Service %s does not exist", - ctxt.getCartridgeUuid())); - return; } - if (!service.clusterExists(ctxt.getClusterId())) { - log.warn(String.format("Cluster %s does not exist for service %s", - ctxt.getClusterId(), - ctxt.getCartridgeUuid())); - return; - } + public static void handleClusterInstanceCreated (String serviceUuid, String clusterId, + String alias, String instanceId, String partitionId, + String networkPartitionUuid){ - try { TopologyManager.acquireWriteLock(); - Cluster cluster = service.removeCluster(ctxt.getClusterId()); - deploymentPolicy = cluster.getDeploymentPolicyUuid(); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); - } - /** - * Add member object to the topology and publish member created event - * - * @param memberContext - */ - public static void handleMemberCreatedEvent(MemberContext memberContext) { - Topology topology = TopologyManager.getTopology(); + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceUuid); + if (service == null) { + log.error("Service " + serviceUuid + + " not found in Topology, unable to update the cluster status to Created"); + return; + } - Service service = topology.getService(memberContext.getCartridgeType()); - String clusterId = memberContext.getClusterId(); - Cluster cluster = service.getCluster(clusterId); - String memberId = memberContext.getMemberId(); - String clusterInstanceId = memberContext.getClusterInstanceId(); - String networkPartitionId = memberContext.getNetworkPartitionId(); - String partitionId = memberContext.getPartition().getUuid(); - String lbClusterId = memberContext.getLbClusterId(); - long initTime = memberContext.getInitTime(); - - if (cluster.memberExists(memberId)) { - log.warn(String.format("Member %s already exists", memberId)); - return; - } + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.error("Cluster " + clusterId + " not found in Topology, unable to update " + + "status to Created"); + return; + } - try { - TopologyManager.acquireWriteLock(); - Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, - networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime); - member.setStatus(MemberStatus.Created); - member.setLbClusterId(lbClusterId); - member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); - cluster.addMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } + if (cluster.getInstanceContexts(instanceId) != null) { + log.warn("The Instance context for the cluster already exists for [cluster] " + + clusterId + " [instance-id] " + instanceId); + return; + } - TopologyEventPublisher.sendMemberCreatedEvent(memberContext); - } + ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId); + clusterInstance.setNetworkPartitionUuid(networkPartitionUuid); + clusterInstance.setPartitionId(partitionId); + cluster.addInstanceContext(instanceId, clusterInstance); + TopologyManager.updateTopology(topology); - /** - * Update member status to initialized and publish member initialized event - * - * @param memberContext - */ - public static void handleMemberInitializedEvent(MemberContext memberContext) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(memberContext.getCartridgeType()); - if (service == null) { - log.warn(String.format("Service %s does not exist", - memberContext.getCartridgeType())); - return; - } - if (!service.clusterExists(memberContext.getClusterId())) { - log.warn(String.format("Cluster %s does not exist in service %s", - memberContext.getClusterId(), - memberContext.getCartridgeType())); - return; - } + ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = + new ClusterInstanceCreatedEvent(serviceUuid, clusterId, + clusterInstance); + clusterInstanceCreatedEvent.setPartitionId(partitionId); + TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); - Member member = service.getCluster(memberContext.getClusterId()). - getMember(memberContext.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - memberContext.getMemberId())); - return; + } finally { + TopologyManager.releaseWriteLock(); + } } - try { - TopologyManager.acquireWriteLock(); - // Set ip addresses - member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP()); - if (memberContext.getPrivateIPs() != null) { - member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs())); - } - member.setDefaultPublicIP(memberContext.getDefaultPublicIP()); - if (memberContext.getPublicIPs() != null) { - member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs())); + public static void handleClusterRemoved (ClusterContext ctxt){ + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(ctxt.getCartridgeUuid()); + String deploymentPolicy; + if (service == null) { + log.warn(String.format("Service %s does not exist", + ctxt.getCartridgeUuid())); + return; } - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Initialized)) { - log.error("Invalid state transition from " + member.getStatus() + " to " + - MemberStatus.Initialized); + if (!service.clusterExists(ctxt.getClusterId())) { + log.warn(String.format("Cluster %s does not exist for service %s", + ctxt.getClusterId(), + ctxt.getCartridgeUuid())); return; - } else { - member.setStatus(MemberStatus.Initialized); - log.info("Member status updated to initialized"); + } + try { + TopologyManager.acquireWriteLock(); + Cluster cluster = service.removeCluster(ctxt.getClusterId()); + deploymentPolicy = cluster.getDeploymentPolicyUuid(); TopologyManager.updateTopology(topology); - - TopologyEventPublisher.sendMemberInitializedEvent(memberContext); - //publishing data - BAMUsageDataPublisher.publish(memberContext.getMemberId(), - memberContext.getPartition().getUuid(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - null); + } finally { + TopologyManager.releaseWriteLock(); } - } finally { - TopologyManager.releaseWriteLock(); + TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); } - } - private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices, - PortMapping portMapping) { - for (KubernetesService kubernetesService : kubernetesServices) { - if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { - return kubernetesService.getPort(); + /** + * Add member object to the topology and publish member created event + * + * @param memberContext + */ + public static void handleMemberCreatedEvent (MemberContext memberContext){ + Topology topology = TopologyManager.getTopology(); + + Service service = topology.getService(memberContext.getCartridgeType()); + String clusterId = memberContext.getClusterId(); + Cluster cluster = service.getCluster(clusterId); + String memberId = memberContext.getMemberId(); + String clusterInstanceId = memberContext.getClusterInstanceId(); + String networkPartitionId = memberContext.getNetworkPartitionId(); + String partitionId = memberContext.getPartition().getUuid(); + String lbClusterId = memberContext.getLbClusterId(); + long initTime = memberContext.getInitTime(); + String autoscalingReason = memberContext.getProperties().getProperty( + StratosConstants.SCALING_REASON).getValue(); + long scalingTime = Long.parseLong(memberContext.getProperties().getProperty( + StratosConstants.SCALING_TIME).getValue()); + + + if (cluster.memberExists(memberId)) { + log.warn(String.format("Member %s already exists", memberId)); + return; } + + try { + TopologyManager.acquireWriteLock(); + Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, + networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime); + member.setStatus(MemberStatus.Created); + member.setLbClusterId(lbClusterId); + member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); + cluster.addMember(member); + TopologyManager.updateTopology(topology); + //member created time + Long timeStamp = System.currentTimeMillis(); + //publishing to BAM + BAMUsageDataPublisher + .publish(memberContext.getMemberId(), + memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + memberContext.getClusterInstanceId(), + memberContext.getCartridgeType(), + MemberStatus.Created.toString(), + timeStamp, autoscalingReason, + scalingTime, null); + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendMemberCreatedEvent(memberContext); } - throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " - + portMapping.getPort()); - } - public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { - try { + /** + * Update member status to initialized and publish member initialized event + * + * @param memberContext + */ + public static void handleMemberInitializedEvent (MemberContext memberContext){ Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceStartedEvent.getServiceName()); + Service service = topology.getService(memberContext.getCartridgeType()); if (service == null) { log.warn(String.format("Service %s does not exist", - instanceStartedEvent.getServiceName())); + memberContext.getCartridgeType())); return; } - if (!service.clusterExists(instanceStartedEvent.getClusterId())) { + if (!service.clusterExists(memberContext.getClusterId())) { log.warn(String.format("Cluster %s does not exist in service %s", - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName())); + memberContext.getClusterId(), + memberContext.getCartridgeType())); return; } - Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId()); - Member member = cluster.getMember(instanceStartedEvent.getMemberId()); + Member member = service.getCluster(memberContext.getClusterId()). + getMember(memberContext.getMemberId()); if (member == null) { log.warn(String.format("Member %s does not exist", - instanceStartedEvent.getMemberId())); + memberContext.getMemberId())); return; } try { TopologyManager.acquireWriteLock(); + + // Set ip addresses + member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP()); + if (memberContext.getPrivateIPs() != null) { + member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs())); + } + member.setDefaultPublicIP(memberContext.getDefaultPublicIP()); + if (memberContext.getPublicIPs() != null) { + member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs())); + } + // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Starting)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + - MemberStatus.Starting); + if (!member.isStateTransitionValid(MemberStatus.Initialized)) { + log.error("Invalid state transition from " + member.getStatus() + " to " + + MemberStatus.Initialized); return; } else { - member.setStatus(MemberStatus.Starting); - log.info("member started event adding status started"); + member.setStatus(MemberStatus.Initialized); + log.info("Member status updated to initialized"); TopologyManager.updateTopology(topology); - //memberStartedEvent. - TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); + //member intialized time + Long timeStamp = System.currentTimeMillis(); + TopologyEventPublisher.sendMemberInitializedEvent(memberContext); //publishing data - BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), - instanceStartedEvent.getPartitionId(), - instanceStartedEvent.getNetworkPartitionId(), - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName(), - MemberStatus.Starting.toString(), - null); + BAMUsageDataPublisher.publish(memberContext.getMemberId(), + memberContext.getPartition().getUuid(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterInstanceId(), + memberContext.getClusterId(), + memberContext.getCartridgeType(), + MemberStatus.Initialized.toString(), + timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); } - } catch (Exception e) { - String message = String.format("Could not handle member started event: [application-id] %s " + - "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(), - instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId()); - log.warn(message, e); - } - } - - public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceActivatedEvent.getServiceName()); - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceActivatedEvent.getServiceName())); - return; } - Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceActivatedEvent.getClusterId())); - return; + private static int findKubernetesServicePort (String clusterId, List < KubernetesService > kubernetesServices, + PortMapping portMapping){ + for (KubernetesService kubernetesService : kubernetesServices) { + if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { + return kubernetesService.getPort(); + } + } + throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " + + portMapping.getPort()); } - Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceActivatedEvent.getMemberId())); - return; - } + public static void handleMemberStarted (InstanceStartedEvent instanceStartedEvent){ + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceStartedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceStartedEvent.getServiceName())); + return; + } + if (!service.clusterExists(instanceStartedEvent.getClusterId())) { + log.warn(String.format("Cluster %s does not exist in service %s", + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName())); + return; + } - MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent( - instanceActivatedEvent.getServiceName(), - instanceActivatedEvent.getClusterId(), - instanceActivatedEvent.getClusterInstanceId(), - instanceActivatedEvent.getMemberId(), - instanceActivatedEvent.getNetworkPartitionId(), - instanceActivatedEvent.getPartitionId()); - - // grouping - set grouid - //TODO - memberActivatedEvent.setApplicationId(null); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Active)) { - log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); - return; - } else { - member.setStatus(MemberStatus.Active); + Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId()); + Member member = cluster.getMember(instanceStartedEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceStartedEvent.getMemberId())); + return; + } - // Set member ports try { - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid()); - if (cartridge == null) { - throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s", - service.getServiceName())); - } - - Port port; - int portValue; - List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); - String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Starting)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.Starting); + return; + } else { + member.setStatus(MemberStatus.Starting); + log.info("member started event adding status started"); - for (PortMapping portMapping : portMappings) { - if (kubernetesServices != null) { - portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping); - } else { - portValue = portMapping.getPort(); - } - port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort()); - member.addPort(port); - memberActivatedEvent.addPort(port); + TopologyManager.updateTopology(topology); + //member started time + Long timeStamp = System.currentTimeMillis(); + //memberStartedEvent. + TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); + //publishing data + BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), + instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getClusterInstanceId(), + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName(), + MemberStatus.Starting.toString(), + timeStamp, null, null, null); } - } catch (Exception e) { - String message = String.format("Could not add member ports: [service-name] %s [member-id] %s", - memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId()); - log.error(message, e); + } finally { + TopologyManager.releaseWriteLock(); } - - // Set member ip addresses - memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP()); - memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs()); - memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); - memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); - TopologyManager.updateTopology(topology); - - // Publish member activated event - TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); - - // Publish statistics data - BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), - memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getServiceName(), - MemberStatus.Active.toString(), - null); + } catch (Exception e) { + String message = String.format("Could not handle member started event: [application-id] %s " + + "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(), + instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId()); + log.warn(message, e); } - } finally { - TopologyManager.releaseWriteLock(); - } - } - - public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceReadyToShutdownEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceReadyToShutdownEvent.getClusterId())); - return; } + public static void handleMemberActivated (InstanceActivatedEvent instanceActivatedEvent){ + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceActivatedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceActivatedEvent.getServiceName())); + return; + } - Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceReadyToShutdownEvent.getMemberId())); - return; - } - MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( - instanceReadyToShutdownEvent.getServiceName(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getClusterInstanceId(), - instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getPartitionId()); - try { - TopologyManager.acquireWriteLock(); + Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceActivatedEvent.getClusterId())); + return; + } - if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + - MemberStatus.ReadyToShutDown); + Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceActivatedEvent.getMemberId())); return; } - member.setStatus(MemberStatus.ReadyToShutDown); - log.info("Member Ready to shut down event adding status started"); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - //publishing data - BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getServiceName(), - MemberStatus.ReadyToShutDown.toString(), - null); - //termination of particular instance will be handled by autoscaler - } + MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent( + instanceActivatedEvent.getServiceName(), + instanceActivatedEvent.getClusterId(), + instanceActivatedEvent.getClusterInstanceId(), + instanceActivatedEvent.getMemberId(), + instanceActivatedEvent.getNetworkPartitionId(), + instanceActivatedEvent.getPartitionId()); + + // grouping - set grouid + //TODO + memberActivatedEvent.setApplicationId(null); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Active)) { + log.error("Invalid state transition from [" + member.getStatus() + "] to [" + + MemberStatus.Active + "]"); + return; + } else { + member.setStatus(MemberStatus.Active); - public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceMaintenanceModeEvent.getServiceName())); - return; - } + // Set member ports + try { + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid()); + if (cartridge == null) { + throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s", + service.getServiceName())); + } - Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceMaintenanceModeEvent.getClusterId())); - return; - } + Port port; + int portValue; + List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); + String clusterId = cluster.getClusterId(); + ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); - Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceMaintenanceModeEvent.getMemberId())); - return; + for (PortMapping portMapping : portMappings) { + if (kubernetesServices != null) { + portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping); + } else { + portValue = portMapping.getPort(); + } + port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort()); + member.addPort(port); + memberActivatedEvent.addPort(port); + } + } catch (Exception e) { + String message = String.format("Could not add member ports: [service-name] %s [member-id] %s", + memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId()); + log.error(message, e); + } + + // Set member ip addresses + memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP()); + memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs()); + memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); + memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); + TopologyManager.updateTopology(topology); + //member activated time + Long timeStamp = System.currentTimeMillis(); + // Publish member activated event + TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); + + // Publish statistics data + BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), + memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterInstanceId(), + memberActivatedEvent.getClusterId(), + memberActivatedEvent.getServiceName(), + MemberStatus.Active.toString(), + timeStamp, null, null, null); + } + } finally { + TopologyManager.releaseWriteLock(); + } } + public static void handleMemberReadyToShutdown (InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceReadyToShutdownEvent.getServiceName())); + return; + } - MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( - instanceMaintenanceModeEvent.getServiceName(), - instanceMaintenanceModeEvent.getClusterId(), - instanceMaintenanceModeEvent.getClusterInstanceId(), - instanceMaintenanceModeEvent.getMemberId(), - instanceMaintenanceModeEvent.getNetworkPartitionId(), - instanceMaintenanceModeEvent.getPartitionId()); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " - + MemberStatus.In_Maintenance); + Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceReadyToShutdownEvent.getClusterId())); return; } - member.setStatus(MemberStatus.In_Maintenance); - log.info("member maintenance mode event adding status started"); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - //publishing data - TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); - } + Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceReadyToShutdownEvent.getMemberId())); + return; + } + MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( + instanceReadyToShutdownEvent.getServiceName(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getPartitionId()); + //member ReadyToShutDown state change time + Long timeStamp = null; + try { + TopologyManager.acquireWriteLock(); - /** - * Remove member from topology and send member terminated event. - * - * @param serviceName - * @param clusterId - * @param networkPartitionId - * @param partitionId - * @param memberId - */ - public static void handleMemberTerminated(String serviceName, String clusterId, - String networkPartitionId, String partitionId, - String memberId) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceName); - Properties properties; - if (service == null) { - log.warn(String.format("Service %s does not exist", - serviceName)); - return; - } - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterId)); - return; - } + if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.ReadyToShutDown); + return; + } + member.setStatus(MemberStatus.ReadyToShutDown); + log.info("Member Ready to shut down event adding status started"); - Member member = cluster.getMember(memberId); - if (member == null) { - log.warn(String.format("Member %s does not exist", - memberId)); - return; + TopologyManager.updateTopology(topology); + timeStamp = System.currentTimeMillis(); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + //publishing data + BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getServiceName(), + MemberStatus.ReadyToShutDown.toString(), + timeStamp, null, null, null); + //termination of particular instance will be handled by autoscaler } - String clusterInstanceId = member.getClusterInstanceId(); + public static void handleMemberMaintenance (InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceMaintenanceModeEvent.getServiceName())); + return; + } - try { - TopologyManager.acquireWriteLock(); - properties = member.getProperties(); - cluster.removeMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - /* @TODO leftover from grouping_poc*/ - String groupAlias = null; - TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId, - clusterInstanceId, networkPartitionId, - partitionId, properties, groupAlias); - } + Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceMaintenanceModeEvent.getClusterId())); + return; + } - public static void handleMemberSuspended() { - //TODO - try { - TopologyManager.acquireWriteLock(); - } finally { - TopologyManager.releaseWriteLock(); - } - } + Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceMaintenanceModeEvent.getMemberId())); + return; + } - public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", - clusterStatusClusterActivatedEvent.getServiceName())); - return; - } + MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( + instanceMaintenanceModeEvent.getServiceName(), + instanceMaintenanceModeEvent.getClusterId(), + instanceMaintenanceModeEvent.getClusterInstanceId(), + instanceMaintenanceModeEvent.getMemberId(), + instanceMaintenanceModeEvent.getNetworkPartitionId(), + instanceMaintenanceModeEvent.getPartitionId()); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.In_Maintenance); + return; + } + member.setStatus(MemberStatus.In_Maintenance); + log.info("member maintenance mode event adding status started"); - Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterStatusClusterActivatedEvent.getClusterId())); - return; - } + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); - String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - if (clusterContext == null) { - log.warn("Cluster context not found: [cluster-id] " + clusterId); - return; } - ClusterInstanceActivatedEvent clusterInstanceActivatedEvent = - new ClusterInstanceActivatedEvent( - clusterStatusClusterActivatedEvent.getAppId(), - clusterStatusClusterActivatedEvent.getServiceName(), - clusterStatusClusterActivatedEvent.getClusterId(), - clusterStatusClusterActivatedEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); - cluster.setKubernetesServices(kubernetesServices); - clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices); - - ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); - if (context == null) { - log.warn("Cluster instance context is not found for [cluster] " + - clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + - clusterStatusClusterActivatedEvent.getInstanceId()); + /** + * Remove member from topology and send member terminated event. + * + * @param serviceName + * @param clusterId + * @param networkPartitionId + * @param partitionId + * @param memberId + */ + public static void handleMemberTerminated (String serviceName, String clusterId, + String networkPartitionId, String partitionId, + String memberId){ + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceName); + Properties properties; + if (service == null) { + log.warn(String.format("Service %s does not exist", + serviceName)); return; } - ClusterStatus status = ClusterStatus.Active; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster activated adding status started for " + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - // publish event - TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(), - context.getStatus(), status)); + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterId)); return; } - } finally { - TopologyManager.releaseWriteLock(); - } - } + Member member = cluster.getMember(memberId); + if (member == null) { + log.warn(String.format("Member %s does not exist", + memberId)); + return; + } - public static void handleClusterInactivateEvent( - ClusterStatusClusterInactivateEvent clusterInactivateEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterInactivateEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", - clusterInactivateEvent.getServiceName())); - return; + String clusterInstanceId = member.getClusterInstanceId(); + + try { + TopologyManager.acquireWriteLock(); + properties = member.getProperties(); + cluster.removeMember(member); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + /* @TODO leftover from grouping_poc*/ + String groupAlias = null; + TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId, + clusterInstanceId, networkPartitionId, + partitionId, properties, groupAlias); } - Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterInactivateEvent.getClusterId())); - return; + public static void handleMemberSuspended () { + //TODO + try { + TopologyManager.acquireWriteLock(); + } finally { + TopologyManager.releaseWriteLock(); + } } - ClusterInstanceInactivateEvent clusterInactivatedEvent1 = - new ClusterInstanceInactivateEvent( - clusterInactivateEvent.getAppId(), - clusterInactivateEvent.getServiceName(), - clusterInactivateEvent.getClusterId(), - clusterInactivateEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - clusterInactivateEvent.getClusterId() + " [instance-id] " + - clusterInactivateEvent.getInstanceId()); + public static void handleClusterActivatedEvent (ClusterStatusClusterActivatedEvent + clusterStatusClusterActivatedEvent){ + + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", + clusterStatusClusterActivatedEvent.getServiceName())); return; } - ClusterStatus status = ClusterStatus.Inactive; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Inactive adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(), - context.getStatus(), status)); + + Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterStatusClusterActivatedEvent.getClusterId())); return; } - } finally { - TopologyManager.releaseWriteLock(); - } - } + String clusterId = cluster.getClusterId(); + ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + if (clusterContext == null) { + log.warn("Cluster context not found: [cluster-id] " + clusterId); + return; + } - private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) { - try { - MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); - metadataClient.deleteApplicationProperties(event.getAppId()); - } catch (Exception e) { - log.error("Error occurred while deleting the application resources frm metadata service ", e); - } - } - - public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) { + ClusterInstanceActivatedEvent clusterInstanceActivatedEvent = + new ClusterInstanceActivatedEvent( + clusterStatusClusterActivatedEvent.getAppId(), + clusterStatusClusterActivatedEvent.getServiceName(), + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + cluster.setKubernetesServices(kubernetesServices); + clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices); + + ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); + if (context == null) { + log.warn("Cluster instance context is not found for [cluster] " + + clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + + clusterStatusClusterActivatedEvent.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Active; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster activated adding status started for " + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + // publish event + TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId(), + context.getStatus(), status)); + return; + } + } finally { + TopologyManager.releaseWriteLock(); + } - TopologyManager.acquireWriteLock(); + } - try { + public static void handleClusterInactivateEvent ( + ClusterStatusClusterInactivateEvent clusterInactivateEvent){ Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - + Service service = topology.getService(clusterInactivateEvent.getServiceName()); //update the status of the cluster if (service == null) { log.warn(String.format("Service %s does not exist", - event.getServiceName())); + clusterInactivateEvent.getServiceName())); return; } - Cluster cluster = service.getCluster(event.getClusterId()); + Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - event.getClusterId())); + clusterInactivateEvent.getClusterId())); return; } - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; + ClusterInstanceInactivateEvent clusterInactivatedEvent1 = + new ClusterInstanceInactivateEvent( + clusterInactivateEvent.getAppId(), + clusterInactivateEvent.getServiceName(), + clusterInactivateEvent.getClusterId(), + clusterInactivateEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + clusterInactivateEvent.getClusterId() + " [instance-id] " + + clusterInactivateEvent.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Inactive; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Inactive adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1); + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(), + context.getStatus(), status)); + return; + } + } finally { + TopologyManager.releaseWriteLock(); } - ClusterStatus status = ClusterStatus.Terminated; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Terminated adding status started for and removing the cluster instance" - + cluster.getClusterId()); - cluster.removeInstanceContext(event.getInstanceId()); - TopologyManager.updateTopology(topology); - //publishing data - ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); + } - TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); - return; + + private static void deleteAppResourcesFromMetadataService (ApplicationInstanceTerminatedEvent event){ + try { + MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); + metadataClient.deleteApplicationProperties(event.getAppId()); + } catch (Exception e) { + log.error("Error occurred while deleting the application resources frm metadata service ", e); } - } finally { - TopologyManager.releaseWriteLock(); } + public static void handleClusterTerminatedEvent (ClusterStatusClusterTerminatedEvent event){ - } + TopologyManager.acquireWriteLock(); - public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) { + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); - TopologyManager.acquireWriteLock(); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", + event.getServiceName())); + return; + } - try { - Topology topology = TopologyManager.getTopology(); - Cluster cluster = topology.getService(event.getServiceName()). - getCluster(event.getClusterId()); + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + event.getClusterId())); + return; + } - if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) { - log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " + - ClusterStatus.Terminating); - } - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Terminated; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Terminated adding status started for and removing the cluster instance" + + cluster.getClusterId()); + cluster.removeInstanceContext(event.getInstanceId()); + TopologyManager.updateTopology(topology); + //publishing data + ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent( + event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); + + TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); + return; + } + } finally { + TopologyManager.releaseWriteLock(); } - ClusterStatus status = ClusterStatus.Terminating; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Terminating started for " + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); - TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); - // Remove kubernetes services if available - ClusterContext clusterContext = - CloudControllerContext.getInstance().getClusterContext(event.getClusterId()); - if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { - KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId()); + } + + public static void handleClusterTerminatingEvent (ClusterStatusClusterTerminatingEvent event){ + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Cluster cluster = topology.getService(event.getServiceName()). + getCluster(event.getClusterId()); + + if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) { + log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " + + ClusterStatus.Terminating); } - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Terminating; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Terminating started for " + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent( + event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); + + TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); + + // Remove kubernetes services if available + ClusterContext clusterContext = + CloudControllerContext.getInstance().getClusterContext(event.getClusterId()); + if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { + KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId()); + } + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); + } + } finally { + TopologyManager.releaseWriteLock(); } - } finally { - TopologyManager.releaseWriteLock(); } } -}
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index 316984a..96bb38c 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -460,13 +460,13 @@ public class CloudControllerServiceImpl implements CloudControllerService { clusterContext.setVolumes(volumes); } - // Handle member created event - TopologyBuilder.handleMemberCreatedEvent(memberContext); - // Persist member context CloudControllerContext.getInstance().addMemberContext(memberContext); CloudControllerContext.getInstance().persist(); + // Handle member created event + TopologyBuilder.handleMemberCreatedEvent(memberContext); + // Start instance in a new thread if (log.isDebugEnabled()) { log.debug(String.format("Starting instance creator thread: [cluster] %s [cluster-instance] %s " +
