http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index ecd2728,142b16f..0e5c4a5 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@@ -31,7 -31,7 +31,6 @@@ import org.apache.stratos.cloud.control import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.common.Property; - import org.apache.stratos.kubernetes.client.KubernetesConstants; -import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.application.ClusterDataHolder; import org.apache.stratos.messaging.domain.instance.ClusterInstance; import org.apache.stratos.messaging.domain.topology.*; @@@ -148,9 -148,40 +147,9 @@@ public class TopologyBuilder } } - public static void handleClusterCreated(ClusterStatusClusterCreatedEvent event) { - TopologyManager.acquireWriteLock(); - Cluster cluster; - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - if (service == null) { - log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); - return; - } - - if (service.clusterExists(event.getClusterId())) { - log.warn("Cluster " + event.getClusterId() + " is already in the Topology "); - return; - } else { - cluster = new Cluster(event.getServiceName(), - event.getClusterId(), event.getDeploymentPolicyName(), - event.getAutosScalePolicyName(), event.getAppId()); - //cluster.setStatus(Status.Created); - cluster.setHostNames(event.getHostNames()); - cluster.setTenantRange(event.getTenantRange()); - service.addCluster(cluster); - TopologyManager.updateTopology(topology); - } - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendClusterCreatedEvent(cluster); - } - public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) { + public static void handleApplicationClustersCreated(String appUuid, List<Cluster> appClusters) { TopologyManager.acquireWriteLock(); @@@ -180,331 -211,262 +179,248 @@@ } for (PortMapping portMapping : cartridge.getPortMappings()) { - ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appId, + ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appUuid, cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(), - portMapping.getProxyPort()); - CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping); - log.debug("Cluster port mapping created: " + clusterPortMapping.toString()); - } + portMapping.getProxyPort()); + if (portMapping.getKubernetesPortType() != null) { + clusterPortMapping.setKubernetesServiceType(portMapping.getKubernetesPortType()); } + CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping); + log.debug("Cluster port mapping created: " + clusterPortMapping.toString()); + } + } - // Persist cluster port mappings - CloudControllerContext.getInstance().persist(); + // Persist cluster port mappings + CloudControllerContext.getInstance().persist(); - // Send application clusters created event - TopologyEventPublisher.sendApplicationClustersCreated(appId, appClusters); - } + // Send application clusters created event + TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters); + } - public static void handleApplicationClustersRemoved(String appId, - Set<ClusterDataHolder> clusterData) { - TopologyManager.acquireWriteLock(); + public static void handleApplicationClustersRemoved (String appId, + Set < ClusterDataHolder > clusterData){ + TopologyManager.acquireWriteLock(); - List<Cluster> removedClusters = new ArrayList<Cluster>(); - CloudControllerContext context = CloudControllerContext.getInstance(); - try { - Topology topology = TopologyManager.getTopology(); + List<Cluster> removedClusters = new ArrayList<Cluster>(); + CloudControllerContext context = CloudControllerContext.getInstance(); + try { + Topology topology = TopologyManager.getTopology(); + + if (clusterData != null) { + // remove clusters from CC topology model and remove runtime information + for (ClusterDataHolder aClusterData : clusterData) { + Service aService = topology.getService(aClusterData.getServiceUuid()); + if (aService != null) { + removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); + } else { + log.warn("Service " + aClusterData.getServiceType() + " not found, " + + "unable to remove Cluster " + aClusterData.getClusterId()); + } + // remove runtime data + context.removeClusterContext(aClusterData.getClusterId()); - if (clusterData != null) { - // remove clusters from CC topology model and remove runtime information - for (ClusterDataHolder aClusterData : clusterData) { - Service aService = topology.getService(aClusterData.getServiceType()); - if (aService != null) { - removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); - } else { - log.warn("Service " + aClusterData.getServiceType() + " not found, " + - "unable to remove Cluster " + aClusterData.getClusterId()); + log.info("Removed application [ " + appId + " ]'s Cluster " + + "[ " + aClusterData.getClusterId() + " ] from the topology"); } - // remove runtime data - context.removeClusterContext(aClusterData.getClusterId()); - - log.info("Removed application [ " + appId + " ]'s Cluster " + - "[ " + aClusterData.getClusterId() + " ] from the topology"); + // persist runtime data changes + CloudControllerContext.getInstance().persist(); + } else { + log.info("No cluster data found for application " + appId + " to remove"); } - // persist runtime data changes - CloudControllerContext.getInstance().persist(); - } else { - log.info("No cluster data found for application " + appId + " to remove"); - } - - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - - // Remove cluster port mappings of application - CloudControllerContext.getInstance().removeClusterPortMappings(appId); - CloudControllerContext.getInstance().persist(); - - TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); - - } - - public static void handleClusterReset(ClusterStatusClusterResetEvent event) { - TopologyManager.acquireWriteLock(); + TopologyManager.updateTopology(topology); - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - if (service == null) { - log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); - return; + } finally { + TopologyManager.releaseWriteLock(); } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + - "status to Created"); - return; - } + // Remove cluster port mappings of application + CloudControllerContext.getInstance().removeClusterPortMappings(appId); + CloudControllerContext.getInstance().persist(); - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Created; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Created adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(), - event.getClusterId(), event.getInstanceId()); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); - } + TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); - } finally { - TopologyManager.releaseWriteLock(); } + public static void handleClusterReset (ClusterStatusClusterResetEvent event){ + TopologyManager.acquireWriteLock(); - } - - public static void handleClusterInstanceCreated(String serviceType, String clusterId, - String alias, String instanceId, String partitionId, - String networkPartitionId) { - - TopologyManager.acquireWriteLock(); + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); + if (service == null) { + log.error("Service " + event.getServiceName() + + " not found in Topology, unable to update the cluster status to Created"); + return; + } - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceType); - if (service == null) { - log.error("Service " + serviceType + - " not found in Topology, unable to update the cluster status to Created"); - return; - } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + + "status to Created"); + return; + } - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.error("Cluster " + clusterId + " not found in Topology, unable to update " + - "status to Created"); - return; - } + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Created; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Created adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(), + event.getClusterId(), event.getInstanceId()); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); + } - if (cluster.getInstanceContexts(instanceId) != null) { - log.warn("The Instance context for the cluster already exists for [cluster] " + - clusterId + " [instance-id] " + instanceId); - return; + } finally { + TopologyManager.releaseWriteLock(); } - ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId); - clusterInstance.setNetworkPartitionId(networkPartitionId); - clusterInstance.setPartitionId(partitionId); - cluster.addInstanceContext(instanceId, clusterInstance); - TopologyManager.updateTopology(topology); - - ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - new ClusterInstanceCreatedEvent(serviceType, clusterId, - clusterInstance); - clusterInstanceCreatedEvent.setPartitionId(partitionId); - TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); - } finally { - TopologyManager.releaseWriteLock(); } - } + public static void handleClusterInstanceCreated (String serviceUuid, String clusterId, + String alias, String instanceId, String partitionId, + String networkPartitionUuid){ - public static void handleClusterRemoved(ClusterContext ctxt) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(ctxt.getCartridgeType()); - String deploymentPolicy; - if (service == null) { - log.warn(String.format("Service %s does not exist", - ctxt.getCartridgeType())); - return; - } - - if (!service.clusterExists(ctxt.getClusterId())) { - log.warn(String.format("Cluster %s does not exist for service %s", - ctxt.getClusterId(), - ctxt.getCartridgeType())); - return; - } - - try { TopologyManager.acquireWriteLock(); - Cluster cluster = service.removeCluster(ctxt.getClusterId()); - deploymentPolicy = cluster.getDeploymentPolicyName(); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); - } - /** - * Add member object to the topology and publish member created event - * - * @param memberContext - */ - public static void handleMemberCreatedEvent(MemberContext memberContext) { - Topology topology = TopologyManager.getTopology(); + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceUuid); + if (service == null) { + log.error("Service " + serviceUuid + + " not found in Topology, unable to update the cluster status to Created"); + return; + } - Service service = topology.getService(memberContext.getCartridgeType()); - String clusterId = memberContext.getClusterId(); - Cluster cluster = service.getCluster(clusterId); - String memberId = memberContext.getMemberId(); - String clusterInstanceId = memberContext.getClusterInstanceId(); - String networkPartitionId = memberContext.getNetworkPartitionId(); - String partitionId = memberContext.getPartition().getId(); - String lbClusterId = memberContext.getLbClusterId(); - long initTime = memberContext.getInitTime(); - - if (cluster.memberExists(memberId)) { - log.warn(String.format("Member %s already exists", memberId)); - return; - } + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.error("Cluster " + clusterId + " not found in Topology, unable to update " + + "status to Created"); + return; + } - try { - TopologyManager.acquireWriteLock(); - Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, - networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime); - member.setStatus(MemberStatus.Created); - member.setLbClusterId(lbClusterId); - member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); - cluster.addMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } + if (cluster.getInstanceContexts(instanceId) != null) { + log.warn("The Instance context for the cluster already exists for [cluster] " + + clusterId + " [instance-id] " + instanceId); + return; + } - TopologyEventPublisher.sendMemberCreatedEvent(memberContext); - } + ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId); + clusterInstance.setNetworkPartitionUuid(networkPartitionUuid); + clusterInstance.setPartitionId(partitionId); + cluster.addInstanceContext(instanceId, clusterInstance); + TopologyManager.updateTopology(topology); - /** - * Update member status to initialized and publish member initialized event - * - * @param memberContext - */ - public static void handleMemberInitializedEvent(MemberContext memberContext) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(memberContext.getCartridgeType()); - if (service == null) { - log.warn(String.format("Service %s does not exist", - memberContext.getCartridgeType())); - return; - } - if (!service.clusterExists(memberContext.getClusterId())) { - log.warn(String.format("Cluster %s does not exist in service %s", - memberContext.getClusterId(), - memberContext.getCartridgeType())); - return; - } + ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = + new ClusterInstanceCreatedEvent(serviceUuid, clusterId, + clusterInstance); + clusterInstanceCreatedEvent.setPartitionId(partitionId); + TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); - Member member = service.getCluster(memberContext.getClusterId()). - getMember(memberContext.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - memberContext.getMemberId())); - return; + } finally { + TopologyManager.releaseWriteLock(); + } } - try { - TopologyManager.acquireWriteLock(); - // Set ip addresses - member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP()); - if (memberContext.getPrivateIPs() != null) { - member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs())); - } - member.setDefaultPublicIP(memberContext.getDefaultPublicIP()); - if (memberContext.getPublicIPs() != null) { - member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs())); + public static void handleClusterRemoved (ClusterContext ctxt){ + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(ctxt.getCartridgeUuid()); + String deploymentPolicy; + if (service == null) { + log.warn(String.format("Service %s does not exist", + ctxt.getCartridgeUuid())); + return; } - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Initialized)) { - log.error("Invalid state transition from " + member.getStatus() + " to " + - MemberStatus.Initialized); + if (!service.clusterExists(ctxt.getClusterId())) { + log.warn(String.format("Cluster %s does not exist for service %s", + ctxt.getClusterId(), + ctxt.getCartridgeUuid())); return; - } else { + } - Cluster cluster = service.getCluster(memberContext.getClusterId()); - String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + try { + TopologyManager.acquireWriteLock(); + Cluster cluster = service.removeCluster(ctxt.getClusterId()); + deploymentPolicy = cluster.getDeploymentPolicyName(); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); + } - if (kubernetesServices != null) { - cluster.setKubernetesServices(kubernetesServices); - } + /** + * Add member object to the topology and publish member created event + * + * @param memberContext + */ + public static void handleMemberCreatedEvent (MemberContext memberContext){ + Topology topology = TopologyManager.getTopology(); - member.setStatus(MemberStatus.Initialized); - log.info("Member status updated to initialized"); + Service service = topology.getService(memberContext.getCartridgeType()); + String clusterId = memberContext.getClusterId(); + Cluster cluster = service.getCluster(clusterId); + String memberId = memberContext.getMemberId(); + String clusterInstanceId = memberContext.getClusterInstanceId(); + String networkPartitionId = memberContext.getNetworkPartitionId(); + String partitionId = memberContext.getPartition().getUuid(); + String lbClusterId = memberContext.getLbClusterId(); + long initTime = memberContext.getInitTime(); - String autoscalingReason = memberContext.getProperties().getProperty( - StratosConstants.SCALING_REASON).getValue(); - long scalingTime = Long.parseLong(memberContext.getProperties().getProperty( - StratosConstants.SCALING_TIME).getValue()); - TopologyManager.updateTopology(topology); - TopologyEventPublisher.sendMemberInitializedEvent(memberContext); - //publishing data - BAMUsageDataPublisher.publish(memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - null); + if (cluster.memberExists(memberId)) { + log.warn(String.format("Member %s already exists", memberId)); + return; } + + try { + TopologyManager.acquireWriteLock(); + Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, + networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime); + member.setStatus(MemberStatus.Created); + member.setLbClusterId(lbClusterId); + member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); + cluster.addMember(member); + TopologyManager.updateTopology(topology); - //member created time - Long timeStamp = System.currentTimeMillis(); - //publishing to BAM - BAMUsageDataPublisher - .publish(memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getClusterInstanceId(), - memberContext.getCartridgeType(), - MemberStatus.Created.toString(), - timeStamp, autoscalingReason, - scalingTime, null); - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendMemberCreatedEvent(memberContext); + } finally { + TopologyManager.releaseWriteLock(); } - } - private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices, - PortMapping portMapping) { - for (KubernetesService kubernetesService : kubernetesServices) { - if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { - return kubernetesService.getPort(); - } - } - throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " - + portMapping.getPort()); ++ TopologyEventPublisher.sendMemberCreatedEvent(memberContext); + } + - public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { - try { + /** + * Update member status to initialized and publish member initialized event + * + * @param memberContext + */ + public static void handleMemberInitializedEvent (MemberContext memberContext){ Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceStartedEvent.getServiceName()); + Service service = topology.getService(memberContext.getCartridgeType()); if (service == null) { log.warn(String.format("Service %s does not exist", - instanceStartedEvent.getServiceName())); + memberContext.getCartridgeType())); return; } - if (!service.clusterExists(instanceStartedEvent.getClusterId())) { + if (!service.clusterExists(memberContext.getClusterId())) { log.warn(String.format("Cluster %s does not exist in service %s", - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName())); + memberContext.getClusterId(), + memberContext.getCartridgeType())); return; } @@@ -542,448 -517,427 +471,426 @@@ } finally { TopologyManager.releaseWriteLock(); } - } catch (Exception e) { - String message = String.format("Could not handle member started event: [application-id] %s " + - "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(), - instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId()); - log.warn(message, e); } - } - public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceActivatedEvent.getServiceName()); - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceActivatedEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceActivatedEvent.getClusterId())); - return; + private static int findKubernetesServicePort (String clusterId, List < KubernetesService > kubernetesServices, + PortMapping portMapping){ + for (KubernetesService kubernetesService : kubernetesServices) { + if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { + return kubernetesService.getPort(); + } + } + throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " + + portMapping.getPort()); } - Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceActivatedEvent.getMemberId())); - return; - } + public static void handleMemberStarted (InstanceStartedEvent instanceStartedEvent){ + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceStartedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceStartedEvent.getServiceName())); + return; + } + if (!service.clusterExists(instanceStartedEvent.getClusterId())) { + log.warn(String.format("Cluster %s does not exist in service %s", + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName())); + return; + } - MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent( - instanceActivatedEvent.getServiceName(), - instanceActivatedEvent.getClusterId(), - instanceActivatedEvent.getClusterInstanceId(), - instanceActivatedEvent.getMemberId(), - instanceActivatedEvent.getNetworkPartitionId(), - instanceActivatedEvent.getPartitionId()); - - // grouping - set grouid - //TODO - memberActivatedEvent.setApplicationId(null); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Active)) { - log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); - return; - } else { - member.setStatus(MemberStatus.Active); + Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId()); + Member member = cluster.getMember(instanceStartedEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceStartedEvent.getMemberId())); + return; + } - // Set member ports try { - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceName()); - if (cartridge == null) { - throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s", - service.getServiceName())); - } - - Port port; - int portValue; - List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); - String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Starting)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.Starting); + return; + } else { + member.setStatus(MemberStatus.Starting); + log.info("member started event adding status started"); - for (PortMapping portMapping : portMappings) { - if (kubernetesServices != null) { - portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping); - } else { - portValue = portMapping.getPort(); - } - port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort()); - member.addPort(port); - memberActivatedEvent.addPort(port); + TopologyManager.updateTopology(topology); + //member started time + Long timeStamp = System.currentTimeMillis(); + //memberStartedEvent. + TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); + //publishing data + BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), + instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getClusterInstanceId(), + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName(), + MemberStatus.Starting.toString(), + timeStamp, null, null, null); } - } catch (Exception e) { - String message = String.format("Could not add member ports: [service-name] %s [member-id] %s", - memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId()); - log.error(message, e); + } finally { + TopologyManager.releaseWriteLock(); } - - // Set member ip addresses - memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP()); - memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs()); - memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); - memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); - TopologyManager.updateTopology(topology); - - // Publish member activated event - TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); - - // Publish statistics data - BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), - memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getServiceName(), - MemberStatus.Active.toString(), - null); + } catch (Exception e) { + String message = String.format("Could not handle member started event: [application-id] %s " + + "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(), + instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId()); + log.warn(message, e); } - } finally { - TopologyManager.releaseWriteLock(); - } - } - - public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceReadyToShutdownEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceReadyToShutdownEvent.getClusterId())); - return; } + public static void handleMemberActivated (InstanceActivatedEvent instanceActivatedEvent){ + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceActivatedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceActivatedEvent.getServiceName())); + return; + } - Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceReadyToShutdownEvent.getMemberId())); - return; - } - MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( - instanceReadyToShutdownEvent.getServiceName(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getClusterInstanceId(), - instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getPartitionId()); - try { - TopologyManager.acquireWriteLock(); + Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceActivatedEvent.getClusterId())); + return; + } - if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + - MemberStatus.ReadyToShutDown); + Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceActivatedEvent.getMemberId())); return; } - member.setStatus(MemberStatus.ReadyToShutDown); - log.info("Member Ready to shut down event adding status started"); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - //publishing data - BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getServiceName(), - MemberStatus.ReadyToShutDown.toString(), - null); - //termination of particular instance will be handled by autoscaler - } + MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent( + instanceActivatedEvent.getServiceName(), + instanceActivatedEvent.getClusterId(), + instanceActivatedEvent.getClusterInstanceId(), + instanceActivatedEvent.getMemberId(), + instanceActivatedEvent.getNetworkPartitionId(), + instanceActivatedEvent.getPartitionId()); + + // grouping - set grouid + //TODO + memberActivatedEvent.setApplicationId(null); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Active)) { + log.error("Invalid state transition from [" + member.getStatus() + "] to [" + + MemberStatus.Active + "]"); + return; + } else { + member.setStatus(MemberStatus.Active); - public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceMaintenanceModeEvent.getServiceName())); - return; - } + // Set member ports + try { + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid()); + if (cartridge == null) { + throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s", + service.getServiceName())); + } - Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceMaintenanceModeEvent.getClusterId())); - return; - } + Port port; + int portValue; + List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); + String clusterId = cluster.getClusterId(); + ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); - Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceMaintenanceModeEvent.getMemberId())); - return; + for (PortMapping portMapping : portMappings) { + if (kubernetesServices != null) { + portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping); + } else { + portValue = portMapping.getPort(); + } + port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort()); + member.addPort(port); + memberActivatedEvent.addPort(port); + } + } catch (Exception e) { + String message = String.format("Could not add member ports: [service-name] %s [member-id] %s", + memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId()); + log.error(message, e); + } + + // Set member ip addresses + memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP()); + memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs()); + memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); + memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); + TopologyManager.updateTopology(topology); + //member activated time + Long timeStamp = System.currentTimeMillis(); + // Publish member activated event + TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); + + // Publish statistics data + BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), + memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterInstanceId(), + memberActivatedEvent.getClusterId(), + memberActivatedEvent.getServiceName(), + MemberStatus.Active.toString(), + timeStamp, null, null, null); + } + } finally { + TopologyManager.releaseWriteLock(); + } } + public static void handleMemberReadyToShutdown (InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceReadyToShutdownEvent.getServiceName())); + return; + } - MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( - instanceMaintenanceModeEvent.getServiceName(), - instanceMaintenanceModeEvent.getClusterId(), - instanceMaintenanceModeEvent.getClusterInstanceId(), - instanceMaintenanceModeEvent.getMemberId(), - instanceMaintenanceModeEvent.getNetworkPartitionId(), - instanceMaintenanceModeEvent.getPartitionId()); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " - + MemberStatus.In_Maintenance); + Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceReadyToShutdownEvent.getClusterId())); return; } - member.setStatus(MemberStatus.In_Maintenance); - log.info("member maintenance mode event adding status started"); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - //publishing data - TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); - } + Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceReadyToShutdownEvent.getMemberId())); + return; + } + MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( + instanceReadyToShutdownEvent.getServiceName(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getPartitionId()); + //member ReadyToShutDown state change time + Long timeStamp = null; + try { + TopologyManager.acquireWriteLock(); - /** - * Remove member from topology and send member terminated event. - * - * @param serviceName - * @param clusterId - * @param networkPartitionId - * @param partitionId - * @param memberId - */ - public static void handleMemberTerminated(String serviceName, String clusterId, - String networkPartitionId, String partitionId, - String memberId) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceName); - Properties properties; - if (service == null) { - log.warn(String.format("Service %s does not exist", - serviceName)); - return; - } - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterId)); - return; - } + if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.ReadyToShutDown); + return; + } + member.setStatus(MemberStatus.ReadyToShutDown); + log.info("Member Ready to shut down event adding status started"); - Member member = cluster.getMember(memberId); - if (member == null) { - log.warn(String.format("Member %s does not exist", - memberId)); - return; + TopologyManager.updateTopology(topology); + timeStamp = System.currentTimeMillis(); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + //publishing data + BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getServiceName(), + MemberStatus.ReadyToShutDown.toString(), + timeStamp, null, null, null); + //termination of particular instance will be handled by autoscaler } - String clusterInstanceId = member.getClusterInstanceId(); + public static void handleMemberMaintenance (InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceMaintenanceModeEvent.getServiceName())); + return; + } - try { - TopologyManager.acquireWriteLock(); - properties = member.getProperties(); - cluster.removeMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - /* @TODO leftover from grouping_poc*/ - String groupAlias = null; - TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId, - clusterInstanceId, networkPartitionId, - partitionId, properties, groupAlias); - } + Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceMaintenanceModeEvent.getClusterId())); + return; + } - public static void handleMemberSuspended() { - //TODO - try { - TopologyManager.acquireWriteLock(); - } finally { - TopologyManager.releaseWriteLock(); - } - } + Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceMaintenanceModeEvent.getMemberId())); + return; + } - public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", - clusterStatusClusterActivatedEvent.getServiceName())); - return; - } + MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( + instanceMaintenanceModeEvent.getServiceName(), + instanceMaintenanceModeEvent.getClusterId(), + instanceMaintenanceModeEvent.getClusterInstanceId(), + instanceMaintenanceModeEvent.getMemberId(), + instanceMaintenanceModeEvent.getNetworkPartitionId(), + instanceMaintenanceModeEvent.getPartitionId()); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.In_Maintenance); + return; + } + member.setStatus(MemberStatus.In_Maintenance); + log.info("member maintenance mode event adding status started"); - Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterStatusClusterActivatedEvent.getClusterId())); - return; - } + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); - String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - if (clusterContext == null) { - log.warn("Cluster context not found: [cluster-id] " + clusterId); - return; } - ClusterInstanceActivatedEvent clusterInstanceActivatedEvent = - new ClusterInstanceActivatedEvent( - clusterStatusClusterActivatedEvent.getAppId(), - clusterStatusClusterActivatedEvent.getServiceName(), - clusterStatusClusterActivatedEvent.getClusterId(), - clusterStatusClusterActivatedEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); - - if (kubernetesServices != null) { - - try { - // Generate access URLs for kubernetes services - for (KubernetesService kubernetesService : kubernetesServices) { - - if (kubernetesService.getServiceType().equals(KubernetesConstants.NODE_PORT)) { - // Public IP = Kubernetes minion public IP - String[] publicIPs = kubernetesService.getPublicIPs(); - if ((publicIPs != null) && (publicIPs.length > 0)) { - for (String publicIP : publicIPs) { - // There can be a String array with null values - if (publicIP != null) { - // Using type URI since only http, https, ftp, file, jar protocols are supported in URL - URI accessURL = new URI(kubernetesService.getProtocol(), null, publicIP, - kubernetesService.getPort(), null, null, null); - cluster.addAccessUrl(accessURL.toString()); - clusterInstanceActivatedEvent.addAccessUrl(accessURL.toString()); - } else { - log.error(String.format("Could not create access URL for [Kubernetes-service] %s , " + - "since Public IP is not available", kubernetesService.getId())); - } - } - } - } - } - } catch (URISyntaxException e) { - log.error("Could not create access URLs for Kubernetes services", e); - } + /** + * Remove member from topology and send member terminated event. + * + * @param serviceName + * @param clusterId + * @param networkPartitionId + * @param partitionId + * @param memberId + */ + public static void handleMemberTerminated (String serviceName, String clusterId, + String networkPartitionId, String partitionId, + String memberId){ + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceName); + Properties properties; + if (service == null) { + log.warn(String.format("Service %s does not exist", + serviceName)); + return; } - - ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); - - if (context == null) { - log.warn("Cluster instance context is not found for [cluster] " + - clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + - clusterStatusClusterActivatedEvent.getInstanceId()); + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterId)); return; } - ClusterStatus status = ClusterStatus.Active; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster activated adding status started for " + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - // publish event - TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(), - context.getStatus(), status)); + + Member member = cluster.getMember(memberId); + if (member == null) { + log.warn(String.format("Member %s does not exist", + memberId)); return; } - } finally { - TopologyManager.releaseWriteLock(); - } - } + String clusterInstanceId = member.getClusterInstanceId(); - public static void handleClusterInactivateEvent( - ClusterStatusClusterInactivateEvent clusterInactivateEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterInactivateEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", - clusterInactivateEvent.getServiceName())); - return; + try { + TopologyManager.acquireWriteLock(); + properties = member.getProperties(); + cluster.removeMember(member); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + /* @TODO leftover from grouping_poc*/ + String groupAlias = null; + TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId, + clusterInstanceId, networkPartitionId, + partitionId, properties, groupAlias); } - Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterInactivateEvent.getClusterId())); - return; + public static void handleMemberSuspended () { + //TODO + try { + TopologyManager.acquireWriteLock(); + } finally { + TopologyManager.releaseWriteLock(); + } } - ClusterInstanceInactivateEvent clusterInactivatedEvent1 = - new ClusterInstanceInactivateEvent( - clusterInactivateEvent.getAppId(), - clusterInactivateEvent.getServiceName(), - clusterInactivateEvent.getClusterId(), - clusterInactivateEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - clusterInactivateEvent.getClusterId() + " [instance-id] " + - clusterInactivateEvent.getInstanceId()); + public static void handleClusterActivatedEvent (ClusterStatusClusterActivatedEvent + clusterStatusClusterActivatedEvent){ + + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", + clusterStatusClusterActivatedEvent.getServiceName())); return; } - ClusterStatus status = ClusterStatus.Inactive; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Inactive adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(), - context.getStatus(), status)); + + Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterStatusClusterActivatedEvent.getClusterId())); return; } - } finally { - TopologyManager.releaseWriteLock(); - } - } + String clusterId = cluster.getClusterId(); + ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + if (clusterContext == null) { + log.warn("Cluster context not found: [cluster-id] " + clusterId); + return; + } - private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) { - try { - MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); - metadataClient.deleteApplicationProperties(event.getAppId()); - } catch (Exception e) { - log.error("Error occurred while deleting the application resources frm metadata service ", e); - } - } + ClusterInstanceActivatedEvent clusterInstanceActivatedEvent = + new ClusterInstanceActivatedEvent( + clusterStatusClusterActivatedEvent.getAppId(), + clusterStatusClusterActivatedEvent.getServiceName(), + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + cluster.setKubernetesServices(kubernetesServices); - clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices); + - public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) { - TopologyManager.acquireWriteLock(); + ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); + if (context == null) { + log.warn("Cluster instance context is not found for [cluster] " + + clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + + clusterStatusClusterActivatedEvent.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Active; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster activated adding status started for " + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + // publish event + TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", - clusterStatusClusterActivatedEvent.getClusterId(), - clusterStatusClusterActivatedEvent.getInstanceId(), ++ clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(), + context.getStatus(), status)); + return; + } + } finally { + TopologyManager.releaseWriteLock(); + } - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); + } + public static void handleClusterInactivateEvent ( + ClusterStatusClusterInactivateEvent clusterInactivateEvent){ + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterInactivateEvent.getServiceName()); //update the status of the cluster if (service == null) { log.warn(String.format("Service %s does not exist",
http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/beans/application/ApplicationBean.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/beans/application/ApplicationBean.java index 1c7295d,f6e6745..ae1e6bb --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/beans/application/ApplicationBean.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/beans/application/ApplicationBean.java @@@ -104,11 -95,11 +104,8 @@@ public class ApplicationBean implement this.property = property; } - public String getApplicationId() { - return applicationId; - } + public boolean isSignUpsExist() { + return signUpsExist; + } - public void setSignUpsExist(boolean signUpsExist) { - this.signUpsExist = signUpsExist; - } - public void setApplicationId(String applicationId) { - this.applicationId = applicationId; - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/client/StratosManagerServiceClient.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java index ee0477c,55e97d9..a1f2fa9 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java @@@ -164,12 -167,11 +167,13 @@@ public class StratosConstants // member expiry timeout constants public static final String PENDING_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingMemberExpiryTimeout"; + public static final String SPIN_TERMINATE_PARALLEL = "autoscaler.member.spinAfterTerminate"; public static final String OBSOLETED_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.obsoletedMemberExpiryTimeout"; - public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingTerminationMemberExpiryTimeout"; + public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = + "autoscaler.member.pendingTerminationMemberExpiryTimeout"; public static final String FILTER_VALUE_SEPARATOR = ","; + public static final String TOPOLOGY_APPLICATION_FILTER = "stratos.topology.application.filter"; public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter"; public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter"; public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter"; http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.kubernetes.client/src/main/java/org/apache/stratos/kubernetes/client/interfaces/KubernetesAPIClientInterface.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.kubernetes.client/src/main/java/org/apache/stratos/kubernetes/client/interfaces/KubernetesAPIClientInterface.java index ceeca1c,6741d6b..78504e9 --- a/components/org.apache.stratos.kubernetes.client/src/main/java/org/apache/stratos/kubernetes/client/interfaces/KubernetesAPIClientInterface.java +++ b/components/org.apache.stratos.kubernetes.client/src/main/java/org/apache/stratos/kubernetes/client/interfaces/KubernetesAPIClientInterface.java @@@ -76,10 -76,9 +76,11 @@@ public interface KubernetesAPIClientInt * * @param serviceId * @param serviceLabel - * @param nodePort + * @param servicePort + * @param serviceType * @param containerPortName * @param containerPort ++ * @param publicIPs * @param sessionAffinity * @throws KubernetesClientException */ http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.manager/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/context/StratosManagerContext.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/context/StratosManagerContext.java index e4fa435,5443cbd..5788e8e --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/context/StratosManagerContext.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/context/StratosManagerContext.java @@@ -48,18 -49,21 +48,18 @@@ public class StratosManagerContext impl private static final String SM_CARTRIDGES_APPLICATIONS_WRITE_LOCK = "SM_CARTRIDGES_APPLICATIONS_WRITE_LOCK"; private static final String SM_CARTRIDGEGROUPS_CARTRIDGESUBGROUPS_WRITE_LOCK = "SM_CARTRIDGEGROUPS_CARTRIDGESUBGROUPS_WRITE_LOCK"; private static final String SM_CARTRIDGEGROUPS_APPLICATIONS_WRITE_LOCK = "SM_CARTRIDGEGROUPS_APPLICATIONS_WRITE_LOCK"; - - public static final String DATA_RESOURCE = "/stratos.manager/data"; - - private final transient DistributedObjectProvider distributedObjectProvider; private static final Log log = LogFactory.getLog(StratosManagerContext.class); - + private static volatile StratosManagerContext instance; + private final transient DistributedObjectProvider distributedObjectProvider; /** - * Key - cartridge type - * Value - list of cartridgeGroupNames + * Key - cartridge type uuid + * Value - list of cartridgeGroupNames uuid */ - private Map<String, Set<String>> cartridgeTypeToCartridgeGroupsMap; + private Map<String,Set<String>> cartridgeTypeToCartridgeGroupsMap; /** - * Key - cartridge type - * Value - list of ApplicationNames + * Key - cartridge type uuid + * Value - list of ApplicationNames uuid */ private Map<String, Set<String>> cartridgeTypeToApplicationsMap; http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/services/StratosManagerService.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/services/StratosManagerService.java index 4c4237a,9be1a92..9327b3e --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/services/StratosManagerService.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/services/StratosManagerService.java @@@ -137,10 -119,10 +137,10 @@@ public interface StratosManagerService /** * Removes the used cartridges in cartridge groups from cache structure. * - * @param cartridgeGroupName the cartridge group name - * @param cartridgeNames the cartridge names + * @param cartridgeGroupUuid the cartridge group UUID - * @param cartridgeNames the cartridge names ++ * @param cartridgeNamesUuid the cartridge names */ - public void removeUsedCartridgesInCartridgeGroups(String cartridgeGroupName, String[] cartridgeNames); - public void removeUsedCartridgesInCartridgeGroups(String cartridgeGroupUuid, String[] cartridgeNames); ++ public void removeUsedCartridgesInCartridgeGroups(String cartridgeGroupUuid, String[] cartridgeNamesUuid); /** * Adds the used cartridges in applications to cache structure. http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/services/impl/StratosManagerServiceImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/application/Applications.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/application/Applications.java index 4d366f1,809fccb..442c56b --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/application/Applications.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/application/Applications.java @@@ -46,11 -43,23 +46,23 @@@ public class Applications implements Se this.applicationMap.put(application.getUniqueIdentifier(), application); } - public synchronized Application getApplication(String appId) { - return this.getApplications().get(appId); - public Application getApplication(String applicationUuid) { ++ public synchronized Application getApplication(String applicationUuid) { + return this.getApplications().get(applicationUuid); } - public synchronized boolean isInitialized() { - public Application getApplicationByTenant(String applicationId, int tenantId) { ++ public synchronized Application getApplicationByTenant(String applicationId, int tenantId) { + if(getApplications() != null) { + for (Application application : this.getApplications().values()) { + if (application.getId().equals(applicationId) && application.getTenantId() == tenantId) { + return application; + } + } + } + + return null; + } + + public boolean isInitialized() { return initialized; } http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java index 9f641ec,e79f446..c8e903f --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java @@@ -50,25 -50,31 +50,25 @@@ public class Topology implements Serial } public void addService(Service service) { - this.serviceMap.put(service.getServiceName(), service); + this.serviceMap.put(service.getServiceUuid(), service); } - public synchronized void addServices(Collection<Service> services) { - for (Service service : services) { - addService(service); - } - } - public void removeService(Service service) { - this.serviceMap.remove(service.getServiceName()); - TopologyLockHierarchy.getInstance().removeTopologyLockForService(service.getServiceName()); + this.serviceMap.remove(service.getServiceUuid()); + TopologyLockHierarchy.getInstance().removeTopologyLockForService(service.getServiceUuid()); } - public void removeService(String serviceName) { - this.serviceMap.remove(serviceName); - TopologyLockHierarchy.getInstance().removeTopologyLockForService(serviceName); + public void removeService(String serviceUuid) { + this.serviceMap.remove(serviceUuid); + TopologyLockHierarchy.getInstance().removeTopologyLockForService(serviceUuid); } - public Service getService(String serviceName) { - return this.serviceMap.get(serviceName); + public Service getService(String serviceUuid) { + return this.serviceMap.get(serviceUuid); } - public boolean serviceExists(String serviceName) { - return this.serviceMap.containsKey(serviceName); + public boolean serviceExists(String serviceUuid) { + return this.serviceMap.containsKey(serviceUuid); } public void addToCluterMap(Cluster cluster) { http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java index 8fb28cb,08ceb19..0f558b2 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java @@@ -98,17 -97,11 +98,17 @@@ public class ClusterInstanceCreatedMess } return false; } + Cluster cluster = service.getCluster(event.getClusterId()); + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + if (cluster == null) { if (log.isDebugEnabled()) { - log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), + log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceUuid(), event.getClusterId())); } return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/2c34f816/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java ----------------------------------------------------------------------
