http://git-wip-us.apache.org/repos/asf/stratos/blob/218b5fdc/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index 802dfc8..618f2f9 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -52,997 +52,997 @@ import java.util.*; * and build the complete topology with the events received */ public class TopologyBuilder { - private static final Log log = LogFactory.getLog(TopologyBuilder.class); - - public static void handleServiceCreated(List<Cartridge> cartridgeList) { - Service service; - Topology topology = TopologyManager.getTopology(); - if (cartridgeList == null) { - log.warn(String.format("Cartridge list is empty")); - return; - } - - try { - - TopologyManager.acquireWriteLock(); - for (Cartridge cartridge : cartridgeList) { - if (!topology.serviceExists(cartridge.getType())) { - ServiceType serviceType = - cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant; - service = new Service(cartridge.getType(), serviceType); - Properties properties = new Properties(); - - try { - Property[] propertyArray = null; - - if (cartridge.getProperties() != null) { - if (cartridge.getProperties().getProperties() != null) { - propertyArray = cartridge.getProperties().getProperties(); - } - } - - List<Property> propertyList = new ArrayList<Property>(); - if (propertyArray != null) { - propertyList = Arrays.asList(propertyArray); - if (propertyList != null) { - for (Property property : propertyList) { - properties.setProperty(property.getName(), property.getValue()); - } - } - } - } catch (Exception e) { - log.error(e); - } - - service.setProperties(properties); - if (cartridge.getPortMappings() != null) { - List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); - Port port; - //adding ports to the event - for (PortMapping portMapping : portMappings) { - port = new Port(portMapping.getProtocol(), portMapping.getPort(), - portMapping.getProxyPort()); - service.addPort(port); - } - } - - topology.addService(service); - TopologyManager.updateTopology(topology); - } - } - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendServiceCreateEvent(cartridgeList); - } - - public static void handleServiceRemoved(List<Cartridge> cartridgeList) { - Topology topology = TopologyManager.getTopology(); - - for (Cartridge cartridge : cartridgeList) { - Service service = topology.getService(cartridge.getType()); - if (service == null) { - log.warn("Cartridge does not exist [cartidge] " + cartridge); - return; - } - if (service.getClusters().size() == 0) { - if (topology.serviceExists(cartridge.getType())) { - try { - TopologyManager.acquireWriteLock(); - topology.removeService(cartridge.getType()); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList); - } else { - log.warn(String.format("Service %s does not exist..", cartridge.getType())); - } - } else { - log.warn("Subscription already exists. Hence not removing the service:" + cartridge.getType() + - " from the topology"); - } - } - } - - public static void handleClusterCreated(ClusterStatusClusterCreatedEvent event) { - TopologyManager.acquireWriteLock(); - Cluster cluster; - - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - if (service == null) { - log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); - return; - } - - if (service.clusterExists(event.getClusterId())) { - log.warn("Cluster " + event.getClusterId() + " is already in the Topology "); - return; - } else { - cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getDeploymentPolicyName(), - event.getAutosScalePolicyName(), event.getAppId()); - //cluster.setStatus(Status.Created); - cluster.setHostNames(event.getHostNames()); - cluster.setTenantRange(event.getTenantRange()); - service.addCluster(cluster); - TopologyManager.updateTopology(topology); - } - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendClusterCreatedEvent(cluster); - } - - public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - for (Cluster cluster : appClusters) { - Service service = topology.getService(cluster.getServiceName()); - if (service == null) { - log.error("Service " + cluster.getServiceName() + - " not found in Topology, unable to create Application cluster"); - } else { - service.addCluster(cluster); - log.info("Application Cluster " + cluster.getClusterId() + " created in CC topology"); - } - } - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - - log.debug("Creating cluster port mappings: [appication-id] " + appId); - for (Cluster cluster : appClusters) { - String cartridgeType = cluster.getServiceName(); - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - if (cartridge == null) { - throw new CloudControllerException("Cartridge not found: [cartridge-type] " + cartridgeType); - } - - for (PortMapping portMapping : cartridge.getPortMappings()) { - ClusterPortMapping clusterPortMapping = - new ClusterPortMapping(appId, cluster.getClusterId(), portMapping.getName(), - portMapping.getProtocol(), portMapping.getPort(), - portMapping.getProxyPort()); - CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping); - log.debug("Cluster port mapping created: " + clusterPortMapping.toString()); - } - } - - // Persist cluster port mappings - CloudControllerContext.getInstance().persist(); - - // Send application clusters created event - TopologyEventPublisher.sendApplicationClustersCreated(appId, appClusters); - } - - public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData) { - TopologyManager.acquireWriteLock(); - - List<Cluster> removedClusters = new ArrayList<Cluster>(); - CloudControllerContext context = CloudControllerContext.getInstance(); - try { - Topology topology = TopologyManager.getTopology(); - - if (clusterData != null) { - // remove clusters from CC topology model and remove runtime information - for (ClusterDataHolder aClusterData : clusterData) { - Service aService = topology.getService(aClusterData.getServiceType()); - if (aService != null) { - removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); - } else { - log.warn("Service " + aClusterData.getServiceType() + " not found, " + - "unable to remove Cluster " + aClusterData.getClusterId()); - } - // remove runtime data - context.removeClusterContext(aClusterData.getClusterId()); - - log.info("Removed application [ " + appId + " ]'s Cluster " + - "[ " + aClusterData.getClusterId() + " ] from the topology"); - } - // persist runtime data changes - CloudControllerContext.getInstance().persist(); - } else { - log.info("No cluster data found for application " + appId + " to remove"); - } - - TopologyManager.updateTopology(topology); - - } finally { - TopologyManager.releaseWriteLock(); - } - - // Remove cluster port mappings of application - CloudControllerContext.getInstance().removeClusterPortMappings(appId); - CloudControllerContext.getInstance().persist(); - - TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); - - } - - public static void handleClusterReset(ClusterStatusClusterResetEvent event) { - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - if (service == null) { - log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); - return; - } - - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + - "status to Created"); - return; - } - - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Created; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Created adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher - .sendClusterResetEvent(event.getAppId(), event.getServiceName(), event.getClusterId(), - event.getInstanceId()); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), context.getStatus(), status)); - } - - } finally { - TopologyManager.releaseWriteLock(); - } - - } - - public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias, - String instanceId, String partitionId, String networkPartitionId) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceType); - if (service == null) { - log.error("Service " + serviceType + - " not found in Topology, unable to update the cluster status to Created"); - return; - } - - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.error("Cluster " + clusterId + " not found in Topology, unable to update " + - "status to Created"); - return; - } - - if (cluster.getInstanceContexts(instanceId) != null) { - log.warn("The Instance context for the cluster already exists for [cluster] " + - clusterId + " [instance-id] " + instanceId); - return; - } - - ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId); - clusterInstance.setNetworkPartitionId(networkPartitionId); - clusterInstance.setPartitionId(partitionId); - cluster.addInstanceContext(instanceId, clusterInstance); - TopologyManager.updateTopology(topology); - - ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - new ClusterInstanceCreatedEvent(serviceType, clusterId, clusterInstance); - clusterInstanceCreatedEvent.setPartitionId(partitionId); - TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); - - } finally { - TopologyManager.releaseWriteLock(); - } - } - - public static void handleClusterRemoved(ClusterContext ctxt) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(ctxt.getCartridgeType()); - String deploymentPolicy; - if (service == null) { - log.warn(String.format("Service %s does not exist", ctxt.getCartridgeType())); - return; - } - - if (!service.clusterExists(ctxt.getClusterId())) { - log.warn(String.format("Cluster %s does not exist for service %s", ctxt.getClusterId(), - ctxt.getCartridgeType())); - return; - } - - try { - TopologyManager.acquireWriteLock(); - Cluster cluster = service.removeCluster(ctxt.getClusterId()); - deploymentPolicy = cluster.getDeploymentPolicyName(); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); - } - - /** - * Add member object to the topology and publish member created event - * - * @param memberContext - */ - public static void handleMemberCreatedEvent(MemberContext memberContext) { - Topology topology = TopologyManager.getTopology(); - - Service service = topology.getService(memberContext.getCartridgeType()); - String clusterId = memberContext.getClusterId(); - Cluster cluster = service.getCluster(clusterId); - String memberId = memberContext.getMemberId(); - String clusterInstanceId = memberContext.getClusterInstanceId(); - String networkPartitionId = memberContext.getNetworkPartitionId(); - String partitionId = memberContext.getPartition().getId(); - String lbClusterId = memberContext.getLbClusterId(); - long initTime = memberContext.getInitTime(); - String autoscalingReason = - memberContext.getProperties().getProperty(StratosConstants.SCALING_REASON).getValue(); - long scalingTime = - Long.parseLong(memberContext.getProperties().getProperty(StratosConstants.SCALING_TIME).getValue()); - - if (cluster.memberExists(memberId)) { - log.warn(String.format("Member %s already exists", memberId)); - return; - } - - try { - TopologyManager.acquireWriteLock(); - Member member = - new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, networkPartitionId, - partitionId, memberContext.getLoadBalancingIPType(), initTime); - member.setStatus(MemberStatus.Created); - member.setLbClusterId(lbClusterId); - member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); - cluster.addMember(member); - TopologyManager.updateTopology(topology); - //Member created time - Long timeStamp = System.currentTimeMillis(); - //publishing to BAM - BAMUsageDataPublisher.publish(memberContext.getMemberId(), memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), memberContext.getClusterId(), - memberContext.getClusterInstanceId(), memberContext.getCartridgeType(), - MemberStatus.Created.toString(), timeStamp, autoscalingReason, scalingTime, - null); - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendMemberCreatedEvent(memberContext); - } - - /** - * Update member status to initialized and publish member initialized event - * - * @param memberContext - */ - public static void handleMemberInitializedEvent(MemberContext memberContext) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(memberContext.getCartridgeType()); - if (service == null) { - log.warn(String.format("Service %s does not exist", memberContext.getCartridgeType())); - return; - } - if (!service.clusterExists(memberContext.getClusterId())) { - log.warn(String.format("Cluster %s does not exist in service %s", memberContext.getClusterId(), - memberContext.getCartridgeType())); - return; - } - - Member member = service.getCluster(memberContext.getClusterId()). - getMember(memberContext.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", memberContext.getMemberId())); - return; - } - - try { - TopologyManager.acquireWriteLock(); - - // Set ip addresses - member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP()); - if (memberContext.getPrivateIPs() != null) { - member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs())); - } - member.setDefaultPublicIP(memberContext.getDefaultPublicIP()); - if (memberContext.getPublicIPs() != null) { - member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs())); - } - - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Initialized)) { - log.error("Invalid state transition from " + member.getStatus() + " to " + - MemberStatus.Initialized); - return; - } else { - member.setStatus(MemberStatus.Initialized); - log.info("Member status updated to initialized"); - - TopologyManager.updateTopology(topology); - //member initialized time - Long timeStamp = System.currentTimeMillis(); - - TopologyEventPublisher.sendMemberInitializedEvent(memberContext); - //publishing data - BAMUsageDataPublisher.publish(memberContext.getMemberId(), memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), memberContext.getClusterId(), - memberContext.getClusterInstanceId(), memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), timeStamp, null, null, null); - } - } finally { - TopologyManager.releaseWriteLock(); - } - } - - private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices, - PortMapping portMapping) { - for (KubernetesService kubernetesService : kubernetesServices) { - if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { - return kubernetesService.getPort(); - } - } - throw new RuntimeException( - "Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " + portMapping.getPort()); - } - - public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceStartedEvent.getServiceName()); - if (service == null) { - log.warn(String.format("Service %s does not exist", instanceStartedEvent.getServiceName())); - return; - } - if (!service.clusterExists(instanceStartedEvent.getClusterId())) { - log.warn(String.format("Cluster %s does not exist in service %s", instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId()); - Member member = cluster.getMember(instanceStartedEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", instanceStartedEvent.getMemberId())); - return; - } - - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Starting)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + - MemberStatus.Starting); - return; - } else { - member.setStatus(MemberStatus.Starting); - log.info("member started event adding status started"); - - TopologyManager.updateTopology(topology); - //member starting time - Long timeStamp = System.currentTimeMillis(); - //memberStartedEvent. - TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); - //publishing data - BAMUsageDataPublisher - .publish(instanceStartedEvent.getMemberId(), instanceStartedEvent.getPartitionId(), - instanceStartedEvent.getNetworkPartitionId(), instanceStartedEvent.getClusterId(), - instanceStartedEvent.getClusterInstanceId(), instanceStartedEvent.getServiceName(), - MemberStatus.Starting.toString(), timeStamp, null, null, null); - } - } finally { - TopologyManager.releaseWriteLock(); - } - } catch (Exception e) { - String message = String.format( - "Could not handle member started event: [application-id] %s " + "[service-name] %s [member-id] %s", - instanceStartedEvent.getApplicationId(), instanceStartedEvent.getServiceName(), - instanceStartedEvent.getMemberId()); - log.warn(message, e); - } - } - - public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceActivatedEvent.getServiceName()); - if (service == null) { - log.warn(String.format("Service %s does not exist", instanceActivatedEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", instanceActivatedEvent.getClusterId())); - return; - } - - Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", instanceActivatedEvent.getMemberId())); - return; - } - - MemberActivatedEvent memberActivatedEvent = - new MemberActivatedEvent(instanceActivatedEvent.getServiceName(), instanceActivatedEvent.getClusterId(), - instanceActivatedEvent.getClusterInstanceId(), - instanceActivatedEvent.getMemberId(), - instanceActivatedEvent.getNetworkPartitionId(), - instanceActivatedEvent.getPartitionId()); - - // grouping - set grouid - //TODO - memberActivatedEvent.setApplicationId(null); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Active)) { - log.error( - "Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); - return; - } else { - member.setStatus(MemberStatus.Active); - - // Set member ports - try { - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceName()); - if (cartridge == null) { - throw new RuntimeException( - String.format("Cartridge not found: [cartridge-type] %s", service.getServiceName())); - } - - Port port; - int portValue; - List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); - String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); - - for (PortMapping portMapping : portMappings) { - if (kubernetesServices != null) { - portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping); - } else { - portValue = portMapping.getPort(); - } - port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort()); - member.addPort(port); - memberActivatedEvent.addPort(port); - } - } catch (Exception e) { - String message = String.format("Could not add member ports: [service-name] %s [member-id] %s", - memberActivatedEvent.getServiceName(), - memberActivatedEvent.getMemberId()); - log.error(message, e); - } - - // Set member ip addresses - memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP()); - memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs()); - memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); - memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); - TopologyManager.updateTopology(topology); - - //member activated time - Long timeStamp = System.currentTimeMillis(); - - // Publish member activated event - TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); - - // Publish statistics data - BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getClusterInstanceId(), - memberActivatedEvent.getServiceName(), MemberStatus.Active.toString(), - timeStamp, null, null, null); - } - } finally { - TopologyManager.releaseWriteLock(); - } - } - - public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", instanceReadyToShutdownEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", instanceReadyToShutdownEvent.getClusterId())); - return; - } - - Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", instanceReadyToShutdownEvent.getMemberId())); - return; - } - MemberReadyToShutdownEvent memberReadyToShutdownEvent = - new MemberReadyToShutdownEvent(instanceReadyToShutdownEvent.getServiceName(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getClusterInstanceId(), - instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getPartitionId()); - //get the time of ReadyToShutdown state change - Long timeStamp = null; - try { - TopologyManager.acquireWriteLock(); - - if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + - MemberStatus.ReadyToShutDown); - return; - } - member.setStatus(MemberStatus.ReadyToShutDown); - log.info("Member Ready to shut down event adding status started"); - - TopologyManager.updateTopology(topology); - - timeStamp = System.currentTimeMillis(); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - //publishing data - BAMUsageDataPublisher - .publish(instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getClusterInstanceId(), - instanceReadyToShutdownEvent.getServiceName(), MemberStatus.ReadyToShutDown.toString(), - timeStamp, null, null, null); - //termination of particular instance will be handled by autoscaler - } - - public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", instanceMaintenanceModeEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", instanceMaintenanceModeEvent.getClusterId())); - return; - } - - Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", instanceMaintenanceModeEvent.getMemberId())); - return; - } - - MemberMaintenanceModeEvent memberMaintenanceModeEvent = - new MemberMaintenanceModeEvent(instanceMaintenanceModeEvent.getServiceName(), - instanceMaintenanceModeEvent.getClusterId(), - instanceMaintenanceModeEvent.getClusterInstanceId(), - instanceMaintenanceModeEvent.getMemberId(), - instanceMaintenanceModeEvent.getNetworkPartitionId(), - instanceMaintenanceModeEvent.getPartitionId()); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.In_Maintenance); - return; - } - member.setStatus(MemberStatus.In_Maintenance); - log.info("member maintenance mode event adding status started"); - - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - //publishing data - TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); - - } - - /** - * Remove member from topology and send member terminated event. - * - * @param serviceName - * @param clusterId - * @param networkPartitionId - * @param partitionId - * @param memberId - */ - public static void handleMemberTerminated(String serviceName, String clusterId, String networkPartitionId, - String partitionId, String memberId) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceName); - Properties properties; - if (service == null) { - log.warn(String.format("Service %s does not exist", serviceName)); - return; - } - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", clusterId)); - return; - } - - Member member = cluster.getMember(memberId); - if (member == null) { - log.warn(String.format("Member %s does not exist", memberId)); - return; - } - - String clusterInstanceId = member.getClusterInstanceId(); - - try { - TopologyManager.acquireWriteLock(); - properties = member.getProperties(); - cluster.removeMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - /* @TODO leftover from grouping_poc*/ - String groupAlias = null; - TopologyEventPublisher - .sendMemberTerminatedEvent(serviceName, clusterId, memberId, clusterInstanceId, networkPartitionId, - partitionId, properties, groupAlias); - } - - public static void handleMemberSuspended() { - //TODO - try { - TopologyManager.acquireWriteLock(); - } finally { - TopologyManager.releaseWriteLock(); - } - } - - public static void handleClusterActivatedEvent( - ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { - - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", clusterStatusClusterActivatedEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", clusterStatusClusterActivatedEvent.getClusterId())); - return; - } - - String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - if (clusterContext == null) { - log.warn("Cluster context not found: [cluster-id] " + clusterId); - return; - } - - ClusterInstanceActivatedEvent clusterInstanceActivatedEvent = - new ClusterInstanceActivatedEvent(clusterStatusClusterActivatedEvent.getAppId(), - clusterStatusClusterActivatedEvent.getServiceName(), - clusterStatusClusterActivatedEvent.getClusterId(), - clusterStatusClusterActivatedEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); - cluster.setKubernetesServices(kubernetesServices); - clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices); - - ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); - if (context == null) { - log.warn("Cluster instance context is not found for [cluster] " + - clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + - clusterStatusClusterActivatedEvent.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Active; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster activated adding status started for " + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - // publish event - TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterStatusClusterActivatedEvent.getClusterId(), - clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(), - status)); - return; - } - } finally { - TopologyManager.releaseWriteLock(); - } - - } - - public static void handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent clusterInactivateEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterInactivateEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", clusterInactivateEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", clusterInactivateEvent.getClusterId())); - return; - } - - ClusterInstanceInactivateEvent clusterInactivatedEvent1 = - new ClusterInstanceInactivateEvent(clusterInactivateEvent.getAppId(), - clusterInactivateEvent.getServiceName(), - clusterInactivateEvent.getClusterId(), - clusterInactivateEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - clusterInactivateEvent.getClusterId() + " [instance-id] " + - clusterInactivateEvent.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Inactive; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Inactive adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(), - context.getStatus(), status)); - return; - } - } finally { - TopologyManager.releaseWriteLock(); - } - } - - private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) { - try { - MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); - metadataClient.deleteApplicationProperties(event.getAppId()); - } catch (Exception e) { - log.error("Error occurred while deleting the application resources frm metadata service ", e); - } - } - - public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", event.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", event.getClusterId())); - return; - } - - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Terminated; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Terminated adding status started for and removing the cluster instance" + - cluster.getClusterId()); - cluster.removeInstanceContext(event.getInstanceId()); - TopologyManager.updateTopology(topology); - //publishing data - ClusterInstanceTerminatedEvent clusterTerminatedEvent = - new ClusterInstanceTerminatedEvent(event.getAppId(), event.getServiceName(), - event.getClusterId(), event.getInstanceId()); - - TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), context.getStatus(), status)); - return; - } - } finally { - TopologyManager.releaseWriteLock(); - } - - } - - public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Cluster cluster = topology.getService(event.getServiceName()). - getCluster(event.getClusterId()); - - if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) { - log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " + - ClusterStatus.Terminating); - } - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Terminating; - if (context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Terminating started for " + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - ClusterInstanceTerminatingEvent clusterTerminaingEvent = - new ClusterInstanceTerminatingEvent(event.getAppId(), event.getServiceName(), - event.getClusterId(), event.getInstanceId()); - - TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); - - // Remove kubernetes services if available - ClusterContext clusterContext = - CloudControllerContext.getInstance().getClusterContext(event.getClusterId()); - if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { - KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId()); - } - } else { - log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), context.getStatus(), status)); - } - } finally { - TopologyManager.releaseWriteLock(); - } - } + private static final Log log = LogFactory.getLog(TopologyBuilder.class); + + public static void handleServiceCreated(List<Cartridge> cartridgeList) { + Service service; + Topology topology = TopologyManager.getTopology(); + if (cartridgeList == null) { + log.warn(String.format("Cartridge list is empty")); + return; + } + + try { + + TopologyManager.acquireWriteLock(); + for (Cartridge cartridge : cartridgeList) { + if (!topology.serviceExists(cartridge.getType())) { + ServiceType serviceType = + cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant; + service = new Service(cartridge.getType(), serviceType); + Properties properties = new Properties(); + + try { + Property[] propertyArray = null; + + if (cartridge.getProperties() != null) { + if (cartridge.getProperties().getProperties() != null) { + propertyArray = cartridge.getProperties().getProperties(); + } + } + + List<Property> propertyList = new ArrayList<Property>(); + if (propertyArray != null) { + propertyList = Arrays.asList(propertyArray); + if (propertyList != null) { + for (Property property : propertyList) { + properties.setProperty(property.getName(), property.getValue()); + } + } + } + } catch (Exception e) { + log.error(e); + } + + service.setProperties(properties); + if (cartridge.getPortMappings() != null) { + List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); + Port port; + //adding ports to the event + for (PortMapping portMapping : portMappings) { + port = new Port(portMapping.getProtocol(), portMapping.getPort(), + portMapping.getProxyPort()); + service.addPort(port); + } + } + + topology.addService(service); + TopologyManager.updateTopology(topology); + } + } + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendServiceCreateEvent(cartridgeList); + } + + public static void handleServiceRemoved(List<Cartridge> cartridgeList) { + Topology topology = TopologyManager.getTopology(); + + for (Cartridge cartridge : cartridgeList) { + Service service = topology.getService(cartridge.getType()); + if (service == null) { + log.warn("Cartridge does not exist [cartidge] " + cartridge); + return; + } + if (service.getClusters().size() == 0) { + if (topology.serviceExists(cartridge.getType())) { + try { + TopologyManager.acquireWriteLock(); + topology.removeService(cartridge.getType()); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList); + } else { + log.warn(String.format("Service %s does not exist..", cartridge.getType())); + } + } else { + log.warn("Subscription already exists. Hence not removing the service:" + cartridge.getType() + + " from the topology"); + } + } + } + + public static void handleClusterCreated(ClusterStatusClusterCreatedEvent event) { + TopologyManager.acquireWriteLock(); + Cluster cluster; + + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); + if (service == null) { + log.error("Service " + event.getServiceName() + + " not found in Topology, unable to update the cluster status to Created"); + return; + } + + if (service.clusterExists(event.getClusterId())) { + log.warn("Cluster " + event.getClusterId() + " is already in the Topology "); + return; + } else { + cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getDeploymentPolicyName(), + event.getAutosScalePolicyName(), event.getAppId()); + //cluster.setStatus(Status.Created); + cluster.setHostNames(event.getHostNames()); + cluster.setTenantRange(event.getTenantRange()); + service.addCluster(cluster); + TopologyManager.updateTopology(topology); + } + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendClusterCreatedEvent(cluster); + } + + public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + for (Cluster cluster : appClusters) { + Service service = topology.getService(cluster.getServiceName()); + if (service == null) { + log.error("Service " + cluster.getServiceName() + + " not found in Topology, unable to create Application cluster"); + } else { + service.addCluster(cluster); + log.info("Application Cluster " + cluster.getClusterId() + " created in CC topology"); + } + } + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + + log.debug("Creating cluster port mappings: [appication-id] " + appId); + for (Cluster cluster : appClusters) { + String cartridgeType = cluster.getServiceName(); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); + if (cartridge == null) { + throw new CloudControllerException("Cartridge not found: [cartridge-type] " + cartridgeType); + } + + for (PortMapping portMapping : cartridge.getPortMappings()) { + ClusterPortMapping clusterPortMapping = + new ClusterPortMapping(appId, cluster.getClusterId(), portMapping.getName(), + portMapping.getProtocol(), portMapping.getPort(), + portMapping.getProxyPort()); + CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping); + log.debug("Cluster port mapping created: " + clusterPortMapping.toString()); + } + } + + // Persist cluster port mappings + CloudControllerContext.getInstance().persist(); + + // Send application clusters created event + TopologyEventPublisher.sendApplicationClustersCreated(appId, appClusters); + } + + public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData) { + TopologyManager.acquireWriteLock(); + + List<Cluster> removedClusters = new ArrayList<Cluster>(); + CloudControllerContext context = CloudControllerContext.getInstance(); + try { + Topology topology = TopologyManager.getTopology(); + + if (clusterData != null) { + // remove clusters from CC topology model and remove runtime information + for (ClusterDataHolder aClusterData : clusterData) { + Service aService = topology.getService(aClusterData.getServiceType()); + if (aService != null) { + removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); + } else { + log.warn("Service " + aClusterData.getServiceType() + " not found, " + + "unable to remove Cluster " + aClusterData.getClusterId()); + } + // remove runtime data + context.removeClusterContext(aClusterData.getClusterId()); + + log.info("Removed application [ " + appId + " ]'s Cluster " + + "[ " + aClusterData.getClusterId() + " ] from the topology"); + } + // persist runtime data changes + CloudControllerContext.getInstance().persist(); + } else { + log.info("No cluster data found for application " + appId + " to remove"); + } + + TopologyManager.updateTopology(topology); + + } finally { + TopologyManager.releaseWriteLock(); + } + + // Remove cluster port mappings of application + CloudControllerContext.getInstance().removeClusterPortMappings(appId); + CloudControllerContext.getInstance().persist(); + + TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); + + } + + public static void handleClusterReset(ClusterStatusClusterResetEvent event) { + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); + if (service == null) { + log.error("Service " + event.getServiceName() + + " not found in Topology, unable to update the cluster status to Created"); + return; + } + + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + + "status to Created"); + return; + } + + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Created; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Created adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher + .sendClusterResetEvent(event.getAppId(), event.getServiceName(), event.getClusterId(), + event.getInstanceId()); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), context.getStatus(), status)); + } + + } finally { + TopologyManager.releaseWriteLock(); + } + + } + + public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias, + String instanceId, String partitionId, String networkPartitionId) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceType); + if (service == null) { + log.error("Service " + serviceType + + " not found in Topology, unable to update the cluster status to Created"); + return; + } + + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.error("Cluster " + clusterId + " not found in Topology, unable to update " + + "status to Created"); + return; + } + + if (cluster.getInstanceContexts(instanceId) != null) { + log.warn("The Instance context for the cluster already exists for [cluster] " + + clusterId + " [instance-id] " + instanceId); + return; + } + + ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId); + clusterInstance.setNetworkPartitionId(networkPartitionId); + clusterInstance.setPartitionId(partitionId); + cluster.addInstanceContext(instanceId, clusterInstance); + TopologyManager.updateTopology(topology); + + ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = + new ClusterInstanceCreatedEvent(serviceType, clusterId, clusterInstance); + clusterInstanceCreatedEvent.setPartitionId(partitionId); + TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); + + } finally { + TopologyManager.releaseWriteLock(); + } + } + + public static void handleClusterRemoved(ClusterContext ctxt) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(ctxt.getCartridgeType()); + String deploymentPolicy; + if (service == null) { + log.warn(String.format("Service %s does not exist", ctxt.getCartridgeType())); + return; + } + + if (!service.clusterExists(ctxt.getClusterId())) { + log.warn(String.format("Cluster %s does not exist for service %s", ctxt.getClusterId(), + ctxt.getCartridgeType())); + return; + } + + try { + TopologyManager.acquireWriteLock(); + Cluster cluster = service.removeCluster(ctxt.getClusterId()); + deploymentPolicy = cluster.getDeploymentPolicyName(); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); + } + + /** + * Add member object to the topology and publish member created event + * + * @param memberContext + */ + public static void handleMemberCreatedEvent(MemberContext memberContext) { + Topology topology = TopologyManager.getTopology(); + + Service service = topology.getService(memberContext.getCartridgeType()); + String clusterId = memberContext.getClusterId(); + Cluster cluster = service.getCluster(clusterId); + String memberId = memberContext.getMemberId(); + String clusterInstanceId = memberContext.getClusterInstanceId(); + String networkPartitionId = memberContext.getNetworkPartitionId(); + String partitionId = memberContext.getPartition().getId(); + String lbClusterId = memberContext.getLbClusterId(); + long initTime = memberContext.getInitTime(); + String autoscalingReason = + memberContext.getProperties().getProperty(StratosConstants.SCALING_REASON).getValue(); + long scalingTime = + Long.parseLong(memberContext.getProperties().getProperty(StratosConstants.SCALING_TIME).getValue()); + + if (cluster.memberExists(memberId)) { + log.warn(String.format("Member %s already exists", memberId)); + return; + } + + try { + TopologyManager.acquireWriteLock(); + Member member = + new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, networkPartitionId, + partitionId, memberContext.getLoadBalancingIPType(), initTime); + member.setStatus(MemberStatus.Created); + member.setLbClusterId(lbClusterId); + member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); + cluster.addMember(member); + TopologyManager.updateTopology(topology); + //Member created time + Long timeStamp = System.currentTimeMillis(); + //publishing to BAM + BAMUsageDataPublisher.publish(memberContext.getMemberId(), memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), memberContext.getClusterId(), + memberContext.getClusterInstanceId(), memberContext.getCartridgeType(), + MemberStatus.Created.toString(), timeStamp, autoscalingReason, scalingTime, + null); + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendMemberCreatedEvent(memberContext); + } + + /** + * Update member status to initialized and publish member initialized event + * + * @param memberContext + */ + public static void handleMemberInitializedEvent(MemberContext memberContext) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(memberContext.getCartridgeType()); + if (service == null) { + log.warn(String.format("Service %s does not exist", memberContext.getCartridgeType())); + return; + } + if (!service.clusterExists(memberContext.getClusterId())) { + log.warn(String.format("Cluster %s does not exist in service %s", memberContext.getClusterId(), + memberContext.getCartridgeType())); + return; + } + + Member member = service.getCluster(memberContext.getClusterId()). + getMember(memberContext.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", memberContext.getMemberId())); + return; + } + + try { + TopologyManager.acquireWriteLock(); + + // Set ip addresses + member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP()); + if (memberContext.getPrivateIPs() != null) { + member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs())); + } + member.setDefaultPublicIP(memberContext.getDefaultPublicIP()); + if (memberContext.getPublicIPs() != null) { + member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs())); + } + + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Initialized)) { + log.error("Invalid state transition from " + member.getStatus() + " to " + + MemberStatus.Initialized); + return; + } else { + member.setStatus(MemberStatus.Initialized); + log.info("Member status updated to initialized"); + + TopologyManager.updateTopology(topology); + //member initialized time + Long timeStamp = System.currentTimeMillis(); + + TopologyEventPublisher.sendMemberInitializedEvent(memberContext); + //publishing data + BAMUsageDataPublisher.publish(memberContext.getMemberId(), memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), memberContext.getClusterId(), + memberContext.getClusterInstanceId(), memberContext.getCartridgeType(), + MemberStatus.Initialized.toString(), timeStamp, null, null, null); + } + } finally { + TopologyManager.releaseWriteLock(); + } + } + + private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices, + PortMapping portMapping) { + for (KubernetesService kubernetesService : kubernetesServices) { + if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { + return kubernetesService.getPort(); + } + } + throw new RuntimeException( + "Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " + portMapping.getPort()); + } + + public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceStartedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", instanceStartedEvent.getServiceName())); + return; + } + if (!service.clusterExists(instanceStartedEvent.getClusterId())) { + log.warn(String.format("Cluster %s does not exist in service %s", instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId()); + Member member = cluster.getMember(instanceStartedEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", instanceStartedEvent.getMemberId())); + return; + } + + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Starting)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.Starting); + return; + } else { + member.setStatus(MemberStatus.Starting); + log.info("member started event adding status started"); + + TopologyManager.updateTopology(topology); + //member starting time + Long timeStamp = System.currentTimeMillis(); + //memberStartedEvent. + TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); + //publishing data + BAMUsageDataPublisher + .publish(instanceStartedEvent.getMemberId(), instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getNetworkPartitionId(), instanceStartedEvent.getClusterId(), + instanceStartedEvent.getClusterInstanceId(), instanceStartedEvent.getServiceName(), + MemberStatus.Starting.toString(), timeStamp, null, null, null); + } + } finally { + TopologyManager.releaseWriteLock(); + } + } catch (Exception e) { + String message = String.format( + "Could not handle member started event: [application-id] %s " + "[service-name] %s [member-id] %s", + instanceStartedEvent.getApplicationId(), instanceStartedEvent.getServiceName(), + instanceStartedEvent.getMemberId()); + log.warn(message, e); + } + } + + public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceActivatedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", instanceActivatedEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", instanceActivatedEvent.getClusterId())); + return; + } + + Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", instanceActivatedEvent.getMemberId())); + return; + } + + MemberActivatedEvent memberActivatedEvent = + new MemberActivatedEvent(instanceActivatedEvent.getServiceName(), instanceActivatedEvent.getClusterId(), + instanceActivatedEvent.getClusterInstanceId(), + instanceActivatedEvent.getMemberId(), + instanceActivatedEvent.getNetworkPartitionId(), + instanceActivatedEvent.getPartitionId()); + + // grouping - set grouid + //TODO + memberActivatedEvent.setApplicationId(null); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Active)) { + log.error( + "Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); + return; + } else { + member.setStatus(MemberStatus.Active); + + // Set member ports + try { + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceName()); + if (cartridge == null) { + throw new RuntimeException( + String.format("Cartridge not found: [cartridge-type] %s", service.getServiceName())); + } + + Port port; + int portValue; + List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); + String clusterId = cluster.getClusterId(); + ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + + for (PortMapping portMapping : portMappings) { + if (kubernetesServices != null) { + portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping); + } else { + portValue = portMapping.getPort(); + } + port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort()); + member.addPort(port); + memberActivatedEvent.addPort(port); + } + } catch (Exception e) { + String message = String.format("Could not add member ports: [service-name] %s [member-id] %s", + memberActivatedEvent.getServiceName(), + memberActivatedEvent.getMemberId()); + log.error(message, e); + } + + // Set member ip addresses + memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP()); + memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs()); + memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); + memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); + TopologyManager.updateTopology(topology); + + //member activated time + Long timeStamp = System.currentTimeMillis(); + + // Publish member activated event + TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); + + // Publish statistics data + BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterId(), + memberActivatedEvent.getClusterInstanceId(), + memberActivatedEvent.getServiceName(), MemberStatus.Active.toString(), + timeStamp, null, null, null); + } + } finally { + TopologyManager.releaseWriteLock(); + } + } + + public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", instanceReadyToShutdownEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", instanceReadyToShutdownEvent.getClusterId())); + return; + } + + Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", instanceReadyToShutdownEvent.getMemberId())); + return; + } + MemberReadyToShutdownEvent memberReadyToShutdownEvent = + new MemberReadyToShutdownEvent(instanceReadyToShutdownEvent.getServiceName(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getPartitionId()); + //get the time of ReadyToShutdown state change + Long timeStamp = null; + try { + TopologyManager.acquireWriteLock(); + + if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.ReadyToShutDown); + return; + } + member.setStatus(MemberStatus.ReadyToShutDown); + log.info("Member Ready to shut down event adding status started"); + + TopologyManager.updateTopology(topology); + + timeStamp = System.currentTimeMillis(); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + //publishing data + BAMUsageDataPublisher + .publish(instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getServiceName(), MemberStatus.ReadyToShutDown.toString(), + timeStamp, null, null, null); + //termination of particular instance will be handled by autoscaler + } + + public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", instanceMaintenanceModeEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", instanceMaintenanceModeEvent.getClusterId())); + return; + } + + Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", instanceMaintenanceModeEvent.getMemberId())); + return; + } + + MemberMaintenanceModeEvent memberMaintenanceModeEvent = + new MemberMaintenanceModeEvent(instanceMaintenanceModeEvent.getServiceName(), + instanceMaintenanceModeEvent.getClusterId(), + instanceMaintenanceModeEvent.getClusterInstanceId(), + instanceMaintenanceModeEvent.getMemberId(), + instanceMaintenanceModeEvent.getNetworkPartitionId(), + instanceMaintenanceModeEvent.getPartitionId()); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.In_Maintenance); + return; + } + member.setStatus(MemberStatus.In_Maintenance); + log.info("member maintenance mode event adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); + + } + + /** + * Remove member from topology and send member terminated event. + * + * @param serviceName + * @param clusterId + * @param networkPartitionId + * @param partitionId + * @param memberId + */ + public static void handleMemberTerminated(String serviceName, String clusterId, String networkPartitionId, + String partitionId, String memberId) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceName); + Properties properties; + if (service == null) { + log.warn(String.format("Service %s does not exist", serviceName)); + return; + } + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", clusterId)); + return; + } + + Member member = cluster.getMember(memberId); + if (member == null) { + log.warn(String.format("Member %s does not exist", memberId)); + return; + } + + String clusterInstanceId = member.getClusterInstanceId(); + + try { + TopologyManager.acquireWriteLock(); + properties = member.getProperties(); + cluster.removeMember(member); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + /* @TODO leftover from grouping_poc*/ + String groupAlias = null; + TopologyEventPublisher + .sendMemberTerminatedEvent(serviceName, clusterId, memberId, clusterInstanceId, networkPartitionId, + partitionId, properties, groupAlias); + } + + public static void handleMemberSuspended() { + //TODO + try { + TopologyManager.acquireWriteLock(); + } finally { + TopologyManager.releaseWriteLock(); + } + } + + public static void handleClusterActivatedEvent( + ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { + + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", clusterStatusClusterActivatedEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", clusterStatusClusterActivatedEvent.getClusterId())); + return; + } + + String clusterId = cluster.getClusterId(); + ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + if (clusterContext == null) { + log.warn("Cluster context not found: [cluster-id] " + clusterId); + return; + } + + ClusterInstanceActivatedEvent clusterInstanceActivatedEvent = + new ClusterInstanceActivatedEvent(clusterStatusClusterActivatedEvent.getAppId(), + clusterStatusClusterActivatedEvent.getServiceName(), + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + cluster.setKubernetesServices(kubernetesServices); + clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices); + + ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); + if (context == null) { + log.warn("Cluster instance context is not found for [cluster] " + + clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + + clusterStatusClusterActivatedEvent.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Active; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster activated adding status started for " + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + // publish event + TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(), + status)); + return; + } + } finally { + TopologyManager.releaseWriteLock(); + } + + } + + public static void handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent clusterInactivateEvent) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterInactivateEvent.getServiceName()); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", clusterInactivateEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", clusterInactivateEvent.getClusterId())); + return; + } + + ClusterInstanceInactivateEvent clusterInactivatedEvent1 = + new ClusterInstanceInactivateEvent(clusterInactivateEvent.getAppId(), + clusterInactivateEvent.getServiceName(), + clusterInactivateEvent.getClusterId(), + clusterInactivateEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + clusterInactivateEvent.getClusterId() + " [instance-id] " + + clusterInactivateEvent.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Inactive; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Inactive adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1); + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(), + context.getStatus(), status)); + return; + } + } finally { + TopologyManager.releaseWriteLock(); + } + } + + private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) { + try { + MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); + metadataClient.deleteApplicationProperties(event.getAppId()); + } catch (Exception e) { + log.error("Error occurred while deleting the application resources frm metadata service ", e); + } + } + + public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); + + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", event.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", event.getClusterId())); + return; + } + + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Terminated; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Terminated adding status started for and removing the cluster instance" + + cluster.getClusterId()); + cluster.removeInstanceContext(event.getInstanceId()); + TopologyManager.updateTopology(topology); + //publishing data + ClusterInstanceTerminatedEvent clusterTerminatedEvent = + new ClusterInstanceTerminatedEvent(event.getAppId(), event.getServiceName(), + event.getClusterId(), event.getInstanceId()); + + TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); + } else { + log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), context.getStatus(), status)); + return; + } + } finally { + TopologyManager.releaseWriteLock(); + } + + } + + public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Cluster cluster = topology.getService(event.getServiceName()). + getCluster(event.getClusterId()); + + if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) { + log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " + + ClusterStatus.Terminating); + } + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Terminating; + if (context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Terminating started for " + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + ClusterInstanceTerminatingEvent clusterTerminaingEvent = + new ClusterInstanceTerminatingEvent(event.getAppId(), event.getServiceName(), + event.getClusterId(), event.getInstanceId()); + + TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); + + // Remove kubernetes services if available + ClusterContext clusterContext = + CloudControllerContext.getInstance().getClusterContext(event.getClusterId()); + if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { + KubernetesIaas.removeKubernetesServices(event.getAppId(
<TRUNCATED>
