http://git-wip-us.apache.org/repos/asf/stratos/blob/76c7724b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index f04a11f..dba9670 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -31,6 +31,7 @@ import
org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl
import
org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.*;
@@ -51,1015 +52,994 @@ import java.util.*;
* and build the complete topology with the events received
*/
public class TopologyBuilder {
- private static final Log log = LogFactory.getLog(TopologyBuilder.class);
-
-
- public static void handleServiceCreated(List<Cartridge> cartridgeList) {
- Service service;
- Topology topology = TopologyManager.getTopology();
- if (cartridgeList == null) {
- log.warn(String.format("Cartridge list is empty"));
- return;
- }
-
- try {
-
- TopologyManager.acquireWriteLock();
- for (Cartridge cartridge : cartridgeList) {
- if (!topology.serviceExists(cartridge.getType())) {
- ServiceType serviceType = cartridge.isMultiTenant() ?
ServiceType.MultiTenant : ServiceType.SingleTenant;
- service = new Service(cartridge.getType(), serviceType);
- Properties properties = new Properties();
-
- try {
- Property[] propertyArray = null;
-
- if (cartridge.getProperties() != null) {
- if (cartridge.getProperties().getProperties() !=
null) {
- propertyArray =
cartridge.getProperties().getProperties();
- }
- }
-
- List<Property> propertyList = new
ArrayList<Property>();
- if (propertyArray != null) {
- propertyList = Arrays.asList(propertyArray);
- if (propertyList != null) {
- for (Property property : propertyList) {
- properties.setProperty(property.getName(),
property.getValue());
- }
- }
- }
- } catch (Exception e) {
- log.error(e);
- }
-
- service.setProperties(properties);
- if (cartridge.getPortMappings() != null) {
- List<PortMapping> portMappings =
Arrays.asList(cartridge.getPortMappings());
- Port port;
- //adding ports to the event
- for (PortMapping portMapping : portMappings) {
- port = new Port(portMapping.getProtocol(),
- portMapping.getPort(),
portMapping.getProxyPort());
- service.addPort(port);
- }
- }
-
- topology.addService(service);
- TopologyManager.updateTopology(topology);
- }
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendServiceCreateEvent(cartridgeList);
- }
-
- public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
- Topology topology = TopologyManager.getTopology();
-
- for (Cartridge cartridge : cartridgeList) {
- Service service = topology.getService(cartridge.getType());
- if (service == null) {
- log.warn("Cartridge does not exist [cartidge] " + cartridge);
- return;
- }
- if (service.getClusters().size() == 0) {
- if (topology.serviceExists(cartridge.getType())) {
- try {
- TopologyManager.acquireWriteLock();
- topology.removeService(cartridge.getType());
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
- } else {
- log.warn(String.format("Service %s does not exist..",
cartridge.getType()));
- }
- } else {
- log.warn("Subscription already exists. Hence not removing the
service:" + cartridge.getType()
- + " from the topology");
- }
- }
- }
-
- public static void handleClusterCreated(ClusterStatusClusterCreatedEvent
event) {
- TopologyManager.acquireWriteLock();
- Cluster cluster;
-
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- log.error("Service " + event.getServiceName() +
- " not found in Topology, unable to update the cluster
status to Created");
- return;
- }
-
- if (service.clusterExists(event.getClusterId())) {
- log.warn("Cluster " + event.getClusterId() + " is already in
the Topology ");
- return;
- } else {
- cluster = new Cluster(event.getServiceName(),
- event.getClusterId(), event.getDeploymentPolicyName(),
- event.getAutosScalePolicyName(), event.getAppId());
- //cluster.setStatus(Status.Created);
- cluster.setHostNames(event.getHostNames());
- cluster.setTenantRange(event.getTenantRange());
- service.addCluster(cluster);
- TopologyManager.updateTopology(topology);
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- TopologyEventPublisher.sendClusterCreatedEvent(cluster);
- }
-
- public static void handleApplicationClustersCreated(String appId,
List<Cluster> appClusters) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- for (Cluster cluster : appClusters) {
- Service service =
topology.getService(cluster.getServiceName());
- if (service == null) {
- log.error("Service " + cluster.getServiceName()
- + " not found in Topology, unable to create
Application cluster");
- } else {
- service.addCluster(cluster);
- log.info("Application Cluster " + cluster.getClusterId() +
" created in CC topology");
- }
- }
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- log.debug("Creating cluster port mappings: [appication-id] " + appId);
- for(Cluster cluster : appClusters) {
- String cartridgeType = cluster.getServiceName();
- Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(cartridgeType);
- if(cartridge == null) {
- throw new CloudControllerException("Cartridge not found:
[cartridge-type] " + cartridgeType);
- }
-
- for(PortMapping portMapping : cartridge.getPortMappings()) {
- ClusterPortMapping clusterPortMapping = new
ClusterPortMapping(appId,
- cluster.getClusterId(), portMapping.getName(),
portMapping.getProtocol(), portMapping.getPort(),
- portMapping.getProxyPort());
-
CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
- log.debug("Cluster port mapping created: " +
clusterPortMapping.toString());
- }
- }
-
- // Persist cluster port mappings
- CloudControllerContext.getInstance().persist();
-
- // Send application clusters created event
- TopologyEventPublisher.sendApplicationClustersCreated(appId,
appClusters);
- }
-
- public static void handleApplicationClustersRemoved(String appId,
- Set<ClusterDataHolder>
clusterData) {
- TopologyManager.acquireWriteLock();
-
- List<Cluster> removedClusters = new ArrayList<Cluster>();
- CloudControllerContext context = CloudControllerContext.getInstance();
- try {
- Topology topology = TopologyManager.getTopology();
-
- if (clusterData != null) {
- // remove clusters from CC topology model and remove runtime
information
- for (ClusterDataHolder aClusterData : clusterData) {
- Service aService =
topology.getService(aClusterData.getServiceType());
- if (aService != null) {
-
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
- } else {
- log.warn("Service " + aClusterData.getServiceType() +
" not found, " +
- "unable to remove Cluster " +
aClusterData.getClusterId());
- }
- // remove runtime data
- context.removeClusterContext(aClusterData.getClusterId());
-
- log.info("Removed application [ " + appId + " ]'s Cluster
" +
- "[ " + aClusterData.getClusterId() + " ] from the
topology");
- }
- // persist runtime data changes
- CloudControllerContext.getInstance().persist();
- } else {
- log.info("No cluster data found for application " + appId + "
to remove");
- }
-
- TopologyManager.updateTopology(topology);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- // Remove cluster port mappings of application
- CloudControllerContext.getInstance().removeClusterPortMappings(appId);
- CloudControllerContext.getInstance().persist();
-
- TopologyEventPublisher.sendApplicationClustersRemoved(appId,
clusterData);
-
- }
-
- public static void handleClusterReset(ClusterStatusClusterResetEvent
event) {
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- log.error("Service " + event.getServiceName() +
- " not found in Topology, unable to update the cluster
status to Created");
- return;
- }
-
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- log.error("Cluster " + event.getClusterId() + " not found in
Topology, unable to update " +
- "status to Created");
- return;
- }
-
- ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster]
" +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Created;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Created adding status started for" +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterResetEvent(event.getAppId(),
event.getServiceName(),
- event.getClusterId(), event.getInstanceId());
- } else {
- log.warn(String.format("Cluster state transition is not valid:
[cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
-
- }
-
- public static void handleClusterInstanceCreated(String serviceType, String
clusterId,
- String alias, String
instanceId, String partitionId,
- String networkPartitionId)
{
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceType);
- if (service == null) {
- log.error("Service " + serviceType +
- " not found in Topology, unable to update the cluster
status to Created");
- return;
- }
-
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.error("Cluster " + clusterId + " not found in Topology,
unable to update " +
- "status to Created");
- return;
- }
-
- if (cluster.getInstanceContexts(instanceId) != null) {
- log.warn("The Instance context for the cluster already exists
for [cluster] " +
- clusterId + " [instance-id] " + instanceId);
- return;
- }
-
- ClusterInstance clusterInstance = new ClusterInstance(alias,
clusterId, instanceId);
- clusterInstance.setNetworkPartitionId(networkPartitionId);
- clusterInstance.setPartitionId(partitionId);
- cluster.addInstanceContext(instanceId, clusterInstance);
- TopologyManager.updateTopology(topology);
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- new ClusterInstanceCreatedEvent(serviceType, clusterId,
- clusterInstance);
- clusterInstanceCreatedEvent.setPartitionId(partitionId);
-
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
-
- public static void handleClusterRemoved(ClusterContext ctxt) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(ctxt.getCartridgeType());
- String deploymentPolicy;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- ctxt.getCartridgeType()));
- return;
- }
-
- if (!service.clusterExists(ctxt.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist for service %s",
- ctxt.getClusterId(),
- ctxt.getCartridgeType()));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
- Cluster cluster = service.removeCluster(ctxt.getClusterId());
- deploymentPolicy = cluster.getDeploymentPolicyName();
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
- }
-
- /**
- * Add member object to the topology and publish member created event
- *
- * @param memberContext
- */
- public static void handleMemberCreatedEvent(MemberContext memberContext) {
- Topology topology = TopologyManager.getTopology();
-
- Service service =
topology.getService(memberContext.getCartridgeType());
- String clusterId = memberContext.getClusterId();
- Cluster cluster = service.getCluster(clusterId);
- String memberId = memberContext.getMemberId();
- String clusterInstanceId = memberContext.getClusterInstanceId();
- String networkPartitionId = memberContext.getNetworkPartitionId();
- String partitionId = memberContext.getPartition().getId();
- String lbClusterId = memberContext.getLbClusterId();
- long initTime = memberContext.getInitTime();
-
- if (cluster.memberExists(memberId)) {
- log.warn(String.format("Member %s already exists", memberId));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
- Member member = new Member(service.getServiceName(), clusterId,
memberId, clusterInstanceId,
- networkPartitionId, partitionId,
memberContext.getLoadBalancingIPType(), initTime);
- member.setStatus(MemberStatus.Created);
- member.setLbClusterId(lbClusterId);
-
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
- cluster.addMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
- }
-
- /**
- * Update member status to initialized and publish member initialized event
- *
- * @param memberContext
- */
- public static void handleMemberInitializedEvent(MemberContext
memberContext) {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(memberContext.getCartridgeType());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- memberContext.getCartridgeType()));
- return;
- }
- if (!service.clusterExists(memberContext.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist in service %s",
- memberContext.getClusterId(),
- memberContext.getCartridgeType()));
- return;
- }
-
- Member member = service.getCluster(memberContext.getClusterId()).
- getMember(memberContext.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- memberContext.getMemberId()));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
-
- // Set ip addresses
- member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
- if (memberContext.getPrivateIPs() != null) {
-
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
- }
- member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
- if (memberContext.getPublicIPs() != null) {
-
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
- }
-
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
- log.error("Invalid state transition from " +
member.getStatus() + " to " +
- MemberStatus.Initialized);
- return;
- } else {
- member.setStatus(MemberStatus.Initialized);
- log.info("Member status updated to initialized");
-
- TopologyManager.updateTopology(topology);
-
-
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
- //publishing data
- BAMUsageDataPublisher.publish(memberContext.getMemberId(),
- memberContext.getPartition().getId(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Initialized.toString(),
- null);
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- private static int findKubernetesServicePort(String clusterId,
List<KubernetesService> kubernetesServices,
- PortMapping portMapping) {
- for (KubernetesService kubernetesService : kubernetesServices) {
- if
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
- return kubernetesService.getPort();
- }
- }
- throw new RuntimeException("Kubernetes service port not found:
[cluster-id] " + clusterId + " [port] "
- + portMapping.getPort());
- }
-
- public static void handleMemberStarted(InstanceStartedEvent
instanceStartedEvent) {
- try {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceStartedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceStartedEvent.getServiceName()));
- return;
- }
- if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist in service
%s",
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName()));
- return;
- }
-
- Cluster cluster =
service.getCluster(instanceStartedEvent.getClusterId());
- Member member =
cluster.getMember(instanceStartedEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceStartedEvent.getMemberId()));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Starting)) {
- log.error("Invalid State Transition from " +
member.getStatus() + " to " +
- MemberStatus.Starting);
- return;
- } else {
- member.setStatus(MemberStatus.Starting);
- log.info("member started event adding status started");
-
- TopologyManager.updateTopology(topology);
- //memberStartedEvent.
-
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
- //publishing data
-
BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
- instanceStartedEvent.getPartitionId(),
- instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName(),
- MemberStatus.Starting.toString(),
- null);
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- } catch (Exception e) {
- String message = String.format("Could not handle member started
event: [application-id] %s " +
- "[service-name] %s [member-id] %s",
instanceStartedEvent.getApplicationId(),
- instanceStartedEvent.getServiceName(),
instanceStartedEvent.getMemberId());
- log.warn(message, e);
- }
- }
-
- public static void handleMemberActivated(InstanceActivatedEvent
instanceActivatedEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceActivatedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceActivatedEvent.getServiceName()));
- return;
- }
-
- Cluster cluster =
service.getCluster(instanceActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceActivatedEvent.getClusterId()));
- return;
- }
-
- Member member =
cluster.getMember(instanceActivatedEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceActivatedEvent.getMemberId()));
- return;
- }
-
- MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
- instanceActivatedEvent.getServiceName(),
- instanceActivatedEvent.getClusterId(),
- instanceActivatedEvent.getClusterInstanceId(),
- instanceActivatedEvent.getMemberId(),
- instanceActivatedEvent.getNetworkPartitionId(),
- instanceActivatedEvent.getPartitionId());
-
- // grouping - set grouid
- //TODO
- memberActivatedEvent.setApplicationId(null);
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Active)) {
- log.error("Invalid state transition from [" +
member.getStatus() + "] to [" + MemberStatus.Active + "]");
- return;
- } else {
- member.setStatus(MemberStatus.Active);
-
- // Set member ports
- try {
- Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(service.getServiceName());
- if (cartridge == null) {
- throw new RuntimeException(String.format("Cartridge
not found: [cartridge-type] %s",
- service.getServiceName()));
- }
-
- Port port;
- int portValue;
- List<PortMapping> portMappings =
Arrays.asList(cartridge.getPortMappings());
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
- List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
-
- for (PortMapping portMapping : portMappings) {
- if (kubernetesServices != null) {
- portValue = findKubernetesServicePort(clusterId,
kubernetesServices, portMapping);
- } else {
- portValue = portMapping.getPort();
- }
- port = new Port(portMapping.getProtocol(), portValue,
portMapping.getProxyPort());
- member.addPort(port);
- memberActivatedEvent.addPort(port);
- }
- } catch (Exception e) {
- String message = String.format("Could not add member
ports: [service-name] %s [member-id] %s",
- memberActivatedEvent.getServiceName(),
memberActivatedEvent.getMemberId());
- log.error(message, e);
- }
-
- // Set member ip addresses
-
memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
-
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
-
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
-
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
- TopologyManager.updateTopology(topology);
-
- // Publish member activated event
-
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
-
- // Publish statistics data
-
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
- memberActivatedEvent.getPartitionId(),
- memberActivatedEvent.getNetworkPartitionId(),
- memberActivatedEvent.getClusterId(),
- memberActivatedEvent.getServiceName(),
- MemberStatus.Active.toString(),
- null);
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- public static void
handleMemberReadyToShutdown(InstanceReadyToShutdownEvent
instanceReadyToShutdownEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceReadyToShutdownEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceReadyToShutdownEvent.getServiceName()));
- return;
- }
-
- Cluster cluster =
service.getCluster(instanceReadyToShutdownEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceReadyToShutdownEvent.getClusterId()));
- return;
- }
-
-
- Member member =
cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceReadyToShutdownEvent.getMemberId()));
- return;
- }
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = new
MemberReadyToShutdownEvent(
- instanceReadyToShutdownEvent.getServiceName(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getClusterInstanceId(),
- instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getPartitionId());
- try {
- TopologyManager.acquireWriteLock();
-
- if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
- log.error("Invalid State Transition from " +
member.getStatus() + " to " +
- MemberStatus.ReadyToShutDown);
- return;
- }
- member.setStatus(MemberStatus.ReadyToShutDown);
- log.info("Member Ready to shut down event adding status started");
-
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- //publishing data
-
BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getPartitionId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getServiceName(),
- MemberStatus.ReadyToShutDown.toString(),
- null);
- //termination of particular instance will be handled by autoscaler
- }
-
- public static void handleMemberMaintenance(InstanceMaintenanceModeEvent
instanceMaintenanceModeEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceMaintenanceModeEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceMaintenanceModeEvent.getServiceName()));
- return;
- }
-
- Cluster cluster =
service.getCluster(instanceMaintenanceModeEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceMaintenanceModeEvent.getClusterId()));
- return;
- }
-
- Member member =
cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceMaintenanceModeEvent.getMemberId()));
- return;
- }
-
-
- MemberMaintenanceModeEvent memberMaintenanceModeEvent = new
MemberMaintenanceModeEvent(
- instanceMaintenanceModeEvent.getServiceName(),
- instanceMaintenanceModeEvent.getClusterId(),
- instanceMaintenanceModeEvent.getClusterInstanceId(),
- instanceMaintenanceModeEvent.getMemberId(),
- instanceMaintenanceModeEvent.getNetworkPartitionId(),
- instanceMaintenanceModeEvent.getPartitionId());
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
- log.error("Invalid State Transition from " +
member.getStatus() + " to "
- + MemberStatus.In_Maintenance);
- return;
- }
- member.setStatus(MemberStatus.In_Maintenance);
- log.info("member maintenance mode event adding status started");
-
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- //publishing data
-
TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
-
- }
-
- /**
- * Remove member from topology and send member terminated event.
- *
- * @param serviceName
- * @param clusterId
- * @param networkPartitionId
- * @param partitionId
- * @param memberId
- */
- public static void handleMemberTerminated(String serviceName, String
clusterId,
- String networkPartitionId,
String partitionId,
- String memberId) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceName);
- Properties properties;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- serviceName));
- return;
- }
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterId));
- return;
- }
-
- Member member = cluster.getMember(memberId);
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- memberId));
- return;
- }
-
- String clusterInstanceId = member.getClusterInstanceId();
-
- try {
- TopologyManager.acquireWriteLock();
- properties = member.getProperties();
- cluster.removeMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- /* @TODO leftover from grouping_poc*/
- String groupAlias = null;
- TopologyEventPublisher.sendMemberTerminatedEvent(serviceName,
clusterId, memberId,
- clusterInstanceId, networkPartitionId,
- partitionId, properties, groupAlias);
- }
-
- public static void handleMemberSuspended() {
- //TODO
- try {
- TopologyManager.acquireWriteLock();
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- public static void
handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent
clusterStatusClusterActivatedEvent) {
-
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterStatusClusterActivatedEvent.getServiceName()));
- return;
- }
-
- Cluster cluster =
service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterStatusClusterActivatedEvent.getClusterId()));
- return;
- }
-
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
- if (clusterContext == null) {
- log.warn("Cluster context not found: [cluster-id] " + clusterId);
- return;
- }
-
- ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
- new ClusterInstanceActivatedEvent(
- clusterStatusClusterActivatedEvent.getAppId(),
- clusterStatusClusterActivatedEvent.getServiceName(),
- clusterStatusClusterActivatedEvent.getClusterId(),
- clusterStatusClusterActivatedEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
- cluster.setKubernetesServices(kubernetesServices);
-
clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
-
- ClusterInstance context =
cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster instance context is not found for [cluster]
" +
- clusterStatusClusterActivatedEvent.getClusterId() + "
[instance-id] " +
- clusterStatusClusterActivatedEvent.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Active;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster activated adding status started for " +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- // publish event
-
TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- clusterStatusClusterActivatedEvent.getClusterId(),
clusterStatusClusterActivatedEvent.getInstanceId(),
- context.getStatus(), status));
- return;
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- }
-
- public static void handleClusterInactivateEvent(
- ClusterStatusClusterInactivateEvent clusterInactivateEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(clusterInactivateEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterInactivateEvent.getServiceName()));
- return;
- }
-
- Cluster cluster =
service.getCluster(clusterInactivateEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterInactivateEvent.getClusterId()));
- return;
- }
-
- ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
- new ClusterInstanceInactivateEvent(
- clusterInactivateEvent.getAppId(),
- clusterInactivateEvent.getServiceName(),
- clusterInactivateEvent.getClusterId(),
- clusterInactivateEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- ClusterInstance context =
cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster]
" +
- clusterInactivateEvent.getClusterId() + "
[instance-id] " +
- clusterInactivateEvent.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Inactive;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Inactive adding status started for" +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
-
TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- clusterInactivateEvent.getClusterId(),
clusterInactivateEvent.getInstanceId(),
- context.getStatus(), status));
- return;
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
-
- private static void
deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event)
{
- try {
- MetaDataServiceClient metadataClient = new
DefaultMetaDataServiceClient();
- metadataClient.deleteApplicationProperties(event.getAppId());
- } catch (Exception e) {
- log.error("Error occurred while deleting the application resources
frm metadata service ", e);
- }
- }
-
- public static void
handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
-
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- event.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- event.getClusterId()));
- return;
- }
-
- ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster]
" +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Terminated;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminated adding status started for and
removing the cluster instance"
- + cluster.getClusterId());
- cluster.removeInstanceContext(event.getInstanceId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatedEvent clusterTerminatedEvent = new
ClusterInstanceTerminatedEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(),
event.getInstanceId());
-
-
TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- return;
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
-
- }
-
- public static void
handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Cluster cluster = topology.getService(event.getServiceName()).
- getCluster(event.getClusterId());
-
- if (!cluster.isStateTransitionValid(ClusterStatus.Terminating,
event.getInstanceId())) {
- log.error("Invalid state transfer from " +
cluster.getStatus(event.getInstanceId()) + " to " +
- ClusterStatus.Terminating);
- }
- ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster]
" +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Terminating;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminating started for " +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatingEvent clusterTerminaingEvent = new
ClusterInstanceTerminatingEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(),
event.getInstanceId());
-
-
TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
-
- // Remove kubernetes services if available
- ClusterContext clusterContext =
-
CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
-
if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
- KubernetesIaas.removeKubernetesServices(event.getAppId(),
event.getClusterId());
- }
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ private static final Log log = LogFactory.getLog(TopologyBuilder.class);
+
+ public static void handleServiceCreated(List<Cartridge> cartridgeList) {
+ Service service;
+ Topology topology = TopologyManager.getTopology();
+ if (cartridgeList == null) {
+ log.warn(String.format("Cartridge list is empty"));
+ return;
+ }
+
+ try {
+
+ TopologyManager.acquireWriteLock();
+ for (Cartridge cartridge : cartridgeList) {
+ if
(!topology.serviceExists(cartridge.getType())) {
+ ServiceType serviceType =
+
cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant;
+ service = new
Service(cartridge.getType(), serviceType);
+ Properties properties = new
Properties();
+
+ try {
+ Property[] propertyArray = null;
+
+ if (cartridge.getProperties()
!= null) {
+ if
(cartridge.getProperties().getProperties() != null) {
+ propertyArray =
cartridge.getProperties().getProperties();
+ }
+ }
+
+ List<Property> propertyList =
new ArrayList<Property>();
+ if (propertyArray != null) {
+ propertyList =
Arrays.asList(propertyArray);
+ if (propertyList !=
null) {
+ for (Property
property : propertyList) {
+
properties.setProperty(property.getName(), property.getValue());
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error(e);
+ }
+
+ service.setProperties(properties);
+ if (cartridge.getPortMappings() !=
null) {
+ List<PortMapping> portMappings
= Arrays.asList(cartridge.getPortMappings());
+ Port port;
+ //adding ports to the event
+ for (PortMapping portMapping :
portMappings) {
+ port = new
Port(portMapping.getProtocol(), portMapping.getPort(),
+
portMapping.getProxyPort());
+ service.addPort(port);
+ }
+ }
+
+ topology.addService(service);
+
TopologyManager.updateTopology(topology);
+ }
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ TopologyEventPublisher.sendServiceCreateEvent(cartridgeList);
+ }
+
+ public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
+ Topology topology = TopologyManager.getTopology();
+
+ for (Cartridge cartridge : cartridgeList) {
+ Service service =
topology.getService(cartridge.getType());
+ if (service == null) {
+ log.warn("Cartridge does not exist [cartidge] "
+ cartridge);
+ return;
+ }
+ if (service.getClusters().size() == 0) {
+ if
(topology.serviceExists(cartridge.getType())) {
+ try {
+
TopologyManager.acquireWriteLock();
+
topology.removeService(cartridge.getType());
+
TopologyManager.updateTopology(topology);
+ } finally {
+
TopologyManager.releaseWriteLock();
+ }
+
TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
+ } else {
+ log.warn(String.format("Service %s does
not exist..", cartridge.getType()));
+ }
+ } else {
+ log.warn("Subscription already exists. Hence
not removing the service:" + cartridge.getType() +
+ " from the topology");
+ }
+ }
+ }
+
+ public static void
handleClusterCreated(ClusterStatusClusterCreatedEvent event) {
+ TopologyManager.acquireWriteLock();
+ Cluster cluster;
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(event.getServiceName());
+ if (service == null) {
+ log.error("Service " + event.getServiceName() +
+ " not found in Topology, unable to
update the cluster status to Created");
+ return;
+ }
+
+ if (service.clusterExists(event.getClusterId())) {
+ log.warn("Cluster " + event.getClusterId() + "
is already in the Topology ");
+ return;
+ } else {
+ cluster = new Cluster(event.getServiceName(),
event.getClusterId(), event.getDeploymentPolicyName(),
+
event.getAutosScalePolicyName(), event.getAppId());
+ //cluster.setStatus(Status.Created);
+ cluster.setHostNames(event.getHostNames());
+ cluster.setTenantRange(event.getTenantRange());
+ service.addCluster(cluster);
+ TopologyManager.updateTopology(topology);
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ TopologyEventPublisher.sendClusterCreatedEvent(cluster);
+ }
+
+ public static void handleApplicationClustersCreated(String appId,
List<Cluster> appClusters) {
+
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ for (Cluster cluster : appClusters) {
+ Service service =
topology.getService(cluster.getServiceName());
+ if (service == null) {
+ log.error("Service " +
cluster.getServiceName() +
+ " not found in Topology,
unable to create Application cluster");
+ } else {
+ service.addCluster(cluster);
+ log.info("Application Cluster " +
cluster.getClusterId() + " created in CC topology");
+ }
+ }
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ log.debug("Creating cluster port mappings: [appication-id] " +
appId);
+ for (Cluster cluster : appClusters) {
+ String cartridgeType = cluster.getServiceName();
+ Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(cartridgeType);
+ if (cartridge == null) {
+ throw new CloudControllerException("Cartridge
not found: [cartridge-type] " + cartridgeType);
+ }
+
+ for (PortMapping portMapping :
cartridge.getPortMappings()) {
+ ClusterPortMapping clusterPortMapping =
+ new ClusterPortMapping(appId,
cluster.getClusterId(), portMapping.getName(),
+
portMapping.getProtocol(), portMapping.getPort(),
+
portMapping.getProxyPort());
+
CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
+ log.debug("Cluster port mapping created: " +
clusterPortMapping.toString());
+ }
+ }
+
+ // Persist cluster port mappings
+ CloudControllerContext.getInstance().persist();
+
+ // Send application clusters created event
+ TopologyEventPublisher.sendApplicationClustersCreated(appId,
appClusters);
+ }
+
+ public static void handleApplicationClustersRemoved(String appId,
Set<ClusterDataHolder> clusterData) {
+ TopologyManager.acquireWriteLock();
+
+ List<Cluster> removedClusters = new ArrayList<Cluster>();
+ CloudControllerContext context =
CloudControllerContext.getInstance();
+ try {
+ Topology topology = TopologyManager.getTopology();
+
+ if (clusterData != null) {
+ // remove clusters from CC topology model and
remove runtime information
+ for (ClusterDataHolder aClusterData :
clusterData) {
+ Service aService =
topology.getService(aClusterData.getServiceType());
+ if (aService != null) {
+
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
+ } else {
+ log.warn("Service " +
aClusterData.getServiceType() + " not found, " +
+ "unable to remove
Cluster " + aClusterData.getClusterId());
+ }
+ // remove runtime data
+
context.removeClusterContext(aClusterData.getClusterId());
+
+ log.info("Removed application [ " +
appId + " ]'s Cluster " +
+ "[ " +
aClusterData.getClusterId() + " ] from the topology");
+ }
+ // persist runtime data changes
+ CloudControllerContext.getInstance().persist();
+ } else {
+ log.info("No cluster data found for application
" + appId + " to remove");
+ }
+
+ TopologyManager.updateTopology(topology);
+
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ // Remove cluster port mappings of application
+
CloudControllerContext.getInstance().removeClusterPortMappings(appId);
+ CloudControllerContext.getInstance().persist();
+
+ TopologyEventPublisher.sendApplicationClustersRemoved(appId,
clusterData);
+
+ }
+
+ public static void handleClusterReset(ClusterStatusClusterResetEvent
event) {
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(event.getServiceName());
+ if (service == null) {
+ log.error("Service " + event.getServiceName() +
+ " not found in Topology, unable to
update the cluster status to Created");
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ log.error("Cluster " + event.getClusterId() + "
not found in Topology, unable to update " +
+ "status to Created");
+ return;
+ }
+
+ ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found
for [cluster] " +
+ event.getClusterId() + " [instance-id]
" +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Created;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Created adding status started
for" + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ TopologyEventPublisher
+
.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
event.getClusterId(),
+
event.getInstanceId());
+ } else {
+ log.warn(String.format("Cluster state
transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s
[current-status] %s [status-requested] %s",
+ event.getClusterId(),
event.getInstanceId(), context.getStatus(), status));
+ }
+
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ }
+
+ public static void handleClusterInstanceCreated(String serviceType,
String clusterId, String alias,
+ String instanceId,
String partitionId, String networkPartitionId) {
+
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceType);
+ if (service == null) {
+ log.error("Service " + serviceType +
+ " not found in Topology, unable to
update the cluster status to Created");
+ return;
+ }
+
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.error("Cluster " + clusterId + " not found
in Topology, unable to update " +
+ "status to Created");
+ return;
+ }
+
+ if (cluster.getInstanceContexts(instanceId) != null) {
+ log.warn("The Instance context for the cluster
already exists for [cluster] " +
+ clusterId + " [instance-id] " +
instanceId);
+ return;
+ }
+
+ ClusterInstance clusterInstance = new
ClusterInstance(alias, clusterId, instanceId);
+
clusterInstance.setNetworkPartitionId(networkPartitionId);
+ clusterInstance.setPartitionId(partitionId);
+ cluster.addInstanceContext(instanceId, clusterInstance);
+ TopologyManager.updateTopology(topology);
+
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent
=
+ new
ClusterInstanceCreatedEvent(serviceType, clusterId, clusterInstance);
+ clusterInstanceCreatedEvent.setPartitionId(partitionId);
+
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
+
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
+
+ public static void handleClusterRemoved(ClusterContext ctxt) {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(ctxt.getCartridgeType());
+ String deploymentPolicy;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
ctxt.getCartridgeType()));
+ return;
+ }
+
+ if (!service.clusterExists(ctxt.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist for
service %s", ctxt.getClusterId(),
+ ctxt.getCartridgeType()));
+ return;
+ }
+
+ try {
+ TopologyManager.acquireWriteLock();
+ Cluster cluster =
service.removeCluster(ctxt.getClusterId());
+ deploymentPolicy = cluster.getDeploymentPolicyName();
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ TopologyEventPublisher.sendClusterRemovedEvent(ctxt,
deploymentPolicy);
+ }
+
+ /**
+ * Add member object to the topology and publish member created event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberCreatedEvent(MemberContext
memberContext) {
+ Topology topology = TopologyManager.getTopology();
+
+ Service service =
topology.getService(memberContext.getCartridgeType());
+ String clusterId = memberContext.getClusterId();
+ Cluster cluster = service.getCluster(clusterId);
+ String memberId = memberContext.getMemberId();
+ String clusterInstanceId = memberContext.getClusterInstanceId();
+ String networkPartitionId =
memberContext.getNetworkPartitionId();
+ String partitionId = memberContext.getPartition().getId();
+ String lbClusterId = memberContext.getLbClusterId();
+ long initTime = memberContext.getInitTime();
+ String autoscalingReason =
+
memberContext.getProperties().getProperty(StratosConstants.SCALING_REASON).getValue();
+ long scalingTime =
+
Long.parseLong(memberContext.getProperties().getProperty(StratosConstants.SCALING_TIME).getValue());
+
+ if (cluster.memberExists(memberId)) {
+ log.warn(String.format("Member %s already exists",
memberId));
+ return;
+ }
+
+ try {
+ TopologyManager.acquireWriteLock();
+ Member member =
+ new Member(service.getServiceName(),
clusterId, memberId, clusterInstanceId, networkPartitionId,
+ partitionId,
memberContext.getLoadBalancingIPType(), initTime);
+ member.setStatus(MemberStatus.Created);
+ member.setLbClusterId(lbClusterId);
+
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
+ cluster.addMember(member);
+ TopologyManager.updateTopology(topology);
+ //Member created time
+ Long timeStamp = System.currentTimeMillis();
+ //publishing to BAM
+
BAMUsageDataPublisher.publish(memberContext.getMemberId(),
memberContext.getPartition().getId(),
+
memberContext.getNetworkPartitionId(), memberContext.getClusterId(),
+
memberContext.getClusterInstanceId(), memberContext.getCartridgeType(),
+
MemberStatus.Created.toString(), timeStamp, autoscalingReason, scalingTime,
+ null);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
+ }
+
+ /**
+ * Update member status to initialized and publish member initialized
event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberInitializedEvent(MemberContext
memberContext) {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(memberContext.getCartridgeType());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
memberContext.getCartridgeType()));
+ return;
+ }
+ if (!service.clusterExists(memberContext.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist in
service %s", memberContext.getClusterId(),
+
memberContext.getCartridgeType()));
+ return;
+ }
+
+ Member member =
service.getCluster(memberContext.getClusterId()).
+ getMember(memberContext.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
memberContext.getMemberId()));
+ return;
+ }
+
+ try {
+ TopologyManager.acquireWriteLock();
+
+ // Set ip addresses
+
member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
+ if (memberContext.getPrivateIPs() != null) {
+
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
+ }
+
member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
+ if (memberContext.getPublicIPs() != null) {
+
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+ }
+
+ // try update lifecycle state
+ if
(!member.isStateTransitionValid(MemberStatus.Initialized)) {
+ log.error("Invalid state transition from " +
member.getStatus() + " to " +
+ MemberStatus.Initialized);
+ return;
+ } else {
+ member.setStatus(MemberStatus.Initialized);
+ log.info("Member status updated to
initialized");
+
+ TopologyManager.updateTopology(topology);
+ //member initialized time
+ Long timeStamp = System.currentTimeMillis();
+
+
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
+ //publishing data
+
BAMUsageDataPublisher.publish(memberContext.getMemberId(),
memberContext.getPartition().getId(),
+
memberContext.getNetworkPartitionId(), memberContext.getClusterId(),
+
memberContext.getCartridgeType(), MemberStatus.Initialized.toString(),
+ timeStamp, null,
null, null);
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
+
+ private static int findKubernetesServicePort(String clusterId,
List<KubernetesService> kubernetesServices,
+ PortMapping portMapping) {
+ for (KubernetesService kubernetesService : kubernetesServices) {
+ if
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
+ return kubernetesService.getPort();
+ }
+ }
+ throw new RuntimeException(
+ "Kubernetes service port not found:
[cluster-id] " + clusterId + " [port] " + portMapping.getPort());
+ }
+
+ public static void handleMemberStarted(InstanceStartedEvent
instanceStartedEvent) {
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(instanceStartedEvent.getServiceName());
+ if (service == null) {
+ log.warn(String.format("Service %s does not
exist", instanceStartedEvent.getServiceName()));
+ return;
+ }
+ if
(!service.clusterExists(instanceStartedEvent.getClusterId())) {
+ log.warn(String.format("Cluster %s does not
exist in service %s", instanceStartedEvent.getClusterId(),
+
instanceStartedEvent.getServiceName()));
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(instanceStartedEvent.getClusterId());
+ Member member =
cluster.getMember(instanceStartedEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not
exist", instanceStartedEvent.getMemberId()));
+ return;
+ }
+
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if
(!member.isStateTransitionValid(MemberStatus.Starting)) {
+ log.error("Invalid State Transition
from " + member.getStatus() + " to " +
+ MemberStatus.Starting);
+ return;
+ } else {
+ member.setStatus(MemberStatus.Starting);
+ log.info("member started event adding
status started");
+
+
TopologyManager.updateTopology(topology);
+ //member starting time
+ Long timeStamp =
System.currentTimeMillis();
+ //memberStartedEvent.
+
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+ //publishing data
+ BAMUsageDataPublisher
+
.publish(instanceStartedEvent.getMemberId(),
instanceStartedEvent.getPartitionId(),
+
instanceStartedEvent.getNetworkPartitionId(),
instanceStartedEvent.getClusterId(),
+
instanceStartedEvent.getServiceName(), MemberStatus.Starting.toString(), null,
+ null, null);
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ } catch (Exception e) {
+ String message = String.format(
+ "Could not handle member started event:
[application-id] %s " + "[service-name] %s [member-id] %s",
+
instanceStartedEvent.getApplicationId(), instanceStartedEvent.getServiceName(),
+ instanceStartedEvent.getMemberId());
+ log.warn(message, e);
+ }
+ }
+
+ public static void handleMemberActivated(InstanceActivatedEvent
instanceActivatedEvent) {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(instanceActivatedEvent.getServiceName());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
instanceActivatedEvent.getServiceName()));
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(instanceActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
instanceActivatedEvent.getClusterId()));
+ return;
+ }
+
+ Member member =
cluster.getMember(instanceActivatedEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
instanceActivatedEvent.getMemberId()));
+ return;
+ }
+
+ MemberActivatedEvent memberActivatedEvent =
+ new
MemberActivatedEvent(instanceActivatedEvent.getServiceName(),
instanceActivatedEvent.getClusterId(),
+
instanceActivatedEvent.getClusterInstanceId(),
+
instanceActivatedEvent.getMemberId(),
+
instanceActivatedEvent.getNetworkPartitionId(),
+
instanceActivatedEvent.getPartitionId());
+
+ // grouping - set grouid
+ //TODO
+ memberActivatedEvent.setApplicationId(null);
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if
(!member.isStateTransitionValid(MemberStatus.Active)) {
+ log.error(
+ "Invalid state transition from
[" + member.getStatus() + "] to [" + MemberStatus.Active + "]");
+ return;
+ } else {
+ member.setStatus(MemberStatus.Active);
+
+ // Set member ports
+ try {
+ Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(service.getServiceName());
+ if (cartridge == null) {
+ throw new RuntimeException(
+
String.format("Cartridge not found: [cartridge-type] %s",
service.getServiceName()));
+ }
+
+ Port port;
+ int portValue;
+ List<PortMapping> portMappings =
Arrays.asList(cartridge.getPortMappings());
+ String clusterId =
cluster.getClusterId();
+ ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
+ List<KubernetesService>
kubernetesServices = clusterContext.getKubernetesServices();
+
+ for (PortMapping portMapping :
portMappings) {
+ if (kubernetesServices != null)
{
+ portValue =
findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
+ } else {
+ portValue =
portMapping.getPort();
+ }
+ port = new
Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
+ member.addPort(port);
+
memberActivatedEvent.addPort(port);
+ }
+ } catch (Exception e) {
+ String message = String.format("Could
not add member ports: [service-name] %s [member-id] %s",
+
memberActivatedEvent.getServiceName(),
+
memberActivatedEvent.getMemberId());
+ log.error(message, e);
+ }
+
+ // Set member ip addresses
+
memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
+
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
+
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
+
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
+ TopologyManager.updateTopology(topology);
+
+ //member activated time
+ Long timeStamp = System.currentTimeMillis();
+
+ // Publish member activated event
+
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+
+ // Publish statistics data
+
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
memberActivatedEvent.getPartitionId(),
+
memberActivatedEvent.getNetworkPartitionId(),
+
memberActivatedEvent.getClusterId(),
+
memberActivatedEvent.getServiceName(), MemberStatus.Active.toString(),
+ null, null, null);
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
+
+ public static void
handleMemberReadyToShutdown(InstanceReadyToShutdownEvent
instanceReadyToShutdownEvent)
+ throws InvalidMemberException,
InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(instanceReadyToShutdownEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
instanceReadyToShutdownEvent.getServiceName()));
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(instanceReadyToShutdownEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
instanceReadyToShutdownEvent.getClusterId()));
+ return;
+ }
+
+ Member member =
cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
instanceReadyToShutdownEvent.getMemberId()));
+ return;
+ }
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent =
+ new
MemberReadyToShutdownEvent(instanceReadyToShutdownEvent.getServiceName(),
+
instanceReadyToShutdownEvent.getClusterId(),
+
instanceReadyToShutdownEvent.getClusterInstanceId(),
+
instanceReadyToShutdownEvent.getMemberId(),
+
instanceReadyToShutdownEvent.getNetworkPartitionId(),
+
instanceReadyToShutdownEvent.getPartitionId());
+ //get the time of ReadyToShutdown state change
+ Long timeStamp = null;
+ try {
+ TopologyManager.acquireWriteLock();
+
+ if
(!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
+ log.error("Invalid State Transition from " +
member.getStatus() + " to " +
+ MemberStatus.ReadyToShutDown);
+ return;
+ }
+ member.setStatus(MemberStatus.ReadyToShutDown);
+ log.info("Member Ready to shut down event adding status
started");
+
+ TopologyManager.updateTopology(topology);
+
+ timeStamp = System.currentTimeMillis();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ //publishing data
+ BAMUsageDataPublisher
+
.publish(instanceReadyToShutdownEvent.getMemberId(),
instanceReadyToShutdownEvent.getPartitionId(),
+
instanceReadyToShutdownEvent.getNetworkPartitionId(),
+
instanceReadyToShutdownEvent.getClusterId(),
instanceReadyToShutdownEvent.getServiceName(),
+
MemberStatus.ReadyToShutDown.toString(), null, null, null);
+ //termination of particular instance will be handled by
autoscaler
+ }
+
+ public static void handleMemberMaintenance(InstanceMaintenanceModeEvent
instanceMaintenanceModeEvent)
+ throws InvalidMemberException,
InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(instanceMaintenanceModeEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
instanceMaintenanceModeEvent.getServiceName()));
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(instanceMaintenanceModeEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
instanceMaintenanceModeEvent.getClusterId()));
+ return;
+ }
+
+ Member member =
cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
instanceMaintenanceModeEvent.getMemberId()));
+ return;
+ }
+
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent =
+ new
MemberMaintenanceModeEvent(instanceMaintenanceModeEvent.getServiceName(),
+
instanceMaintenanceModeEvent.getClusterId(),
+
instanceMaintenanceModeEvent.getClusterInstanceId(),
+
instanceMaintenanceModeEvent.getMemberId(),
+
instanceMaintenanceModeEvent.getNetworkPartitionId(),
+
instanceMaintenanceModeEvent.getPartitionId());
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if
(!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
+ log.error("Invalid State Transition from " +
member.getStatus() + " to " + MemberStatus.In_Maintenance);
+ return;
+ }
+ member.setStatus(MemberStatus.In_Maintenance);
+ log.info("member maintenance mode event adding status
started");
+
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ //publishing data
+
TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
+
+ }
+
+ /**
+ * Remove member from topology and send member terminated event.
+ *
+ * @param serviceName
+ * @param clusterId
+ * @param networkPartitionId
+ * @param partitionId
+ * @param memberId
+ */
+ public static void handleMemberTerminated(String serviceName, String
clusterId, String networkPartitionId,
+ String partitionId, String
memberId) {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceName);
+ Properties properties;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
serviceName));
+ return;
+ }
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
clusterId));
+ return;
+ }
+
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
memberId));
+ return;
+ }
+
+ String clusterInstanceId = member.getClusterInstanceId();
+
+ try {
+ TopologyManager.acquireWriteLock();
+ properties = member.getProperties();
+ cluster.removeMember(member);
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ /* @TODO leftover from grouping_poc*/
+ String groupAlias = null;
+ TopologyEventPublisher
+ .sendMemberTerminatedEvent(serviceName,
clusterId, memberId, clusterInstanceId, networkPartitionId,
+ partitionId,
properties, groupAlias);
+ }
+
+ public static void handleMemberSuspended() {
+ //TODO
+ try {
+ TopologyManager.acquireWriteLock();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
+
+ public static void handleClusterActivatedEvent(
+ ClusterStatusClusterActivatedEvent
clusterStatusClusterActivatedEvent) {
+
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
clusterStatusClusterActivatedEvent.getServiceName()));
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
clusterStatusClusterActivatedEvent.getClusterId()));
+ return;
+ }
+
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
+ if (clusterContext == null) {
+ log.warn("Cluster context not found: [cluster-id] " +
clusterId);
+ return;
+ }
+
+ ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
+ new
ClusterInstanceActivatedEvent(clusterStatusClusterActivatedEvent.getAppId(),
+
clusterStatusClusterActivatedEvent.getServiceName(),
+
clusterStatusClusterActivatedEvent.getClusterId(),
+
clusterStatusClusterActivatedEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
+ cluster.setKubernetesServices(kubernetesServices);
+
clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
+
+ ClusterInstance context =
cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster instance context is not found
for [cluster] " +
+
clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
+
clusterStatusClusterActivatedEvent.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Active;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster activated adding status
started for " + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ // publish event
+
TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
+ } else {
+ log.error(String.format("Cluster state
transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s
[current-status] %s [status-requested] %s",
+
clusterStatusClusterActivatedEvent.getClusterId(),
+
clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(),
+ status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ }
+
+ public static void
handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent
clusterInactivateEvent) {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(clusterInactivateEvent.getServiceName());
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
clusterInactivateEvent.getServiceName()));
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(clusterInactivateEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
clusterInactivateEvent.getClusterId()));
+ return;
+ }
+
+ ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
+ new
ClusterInstanceInactivateEvent(clusterInactivateEvent.getAppId(),
+
clusterInactivateEvent.getServiceName(),
+
clusterInactivateEvent.getClusterId(),
+
clusterInactivateEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ ClusterInstance context =
cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found
for [cluster] " +
+ clusterInactivateEvent.getClusterId()
+ " [instance-id] " +
+
clusterInactivateEvent.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Inactive;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Inactive adding status
started for" + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+
TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
+ } else {
+ log.error(String.format("Cluster state
transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s
[current-status] %s [status-requested] %s",
+
clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
+ context.getStatus(),
status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
+
+ private static void
deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event)
{
+ try {
+ MetaDataServiceClient metadataClient = new
DefaultMetaDataServiceClient();
+
metadataClient.deleteApplicationProperties(event.getAppId());
+ } catch (Exception e) {
+ log.error("Error occurred while deleting the
application resources frm metadata service ", e);
+ }
+ }
+
+ public static void
handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
+
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(event.getServiceName());
+
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not
exist", event.getServiceName()));
+ return;
+ }
+
+ Cluster cluster =
service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not
exist", event.getClusterId()));
+ return;
+ }
+
+ ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found
for [cluster] " +
+ event.getClusterId() + " [instance-id]
" +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Terminated;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminated adding status
started for and removing the cluster instance" +
+ cluster.getClusterId());
+
cluster.removeInstanceContext(event.getInstanceId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatedEvent
clusterTerminatedEvent =
+ new
ClusterInstanceTerminatedEvent(event.getAppId(), event.getServiceName(),
+
event.getClusterId(), event.getInstanceId());
+
+
TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
+ } else {
+ log.error(String.format("Cluster state
transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s
[current-status] %s [status-requested] %s",
+ event.getClusterId(),
event.getInstanceId(), context.getStatus(), status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ }
+
+ public static void
handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
+
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Cluster cluster =
topology.getService(event.getServiceName()).
+ getCluster(event.getClusterId());
+
+ if
(!cluster.isStateTransitionValid(ClusterStatus.Terminating,
event.getInstanceId())) {
+ log.error("Invalid state transfer from " +
cluster.getStatus(event.getInstanceId()) + " to " +
+ ClusterStatus.Terminating);
+ }
+ ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found
for [cluster] " +
+ event.getClusterId() + " [instance-id]
" +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Terminating;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminating started for " +
cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatingEvent
clusterTerminaingEvent =
+ new
ClusterInstanceTerminatingEvent(event.getAppId(), event.getServiceName(),
+
event.getClusterId(), event.getInstanceId());
+
+
TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
+
+ // Remove kubernetes services if available
+ ClusterContext clusterContext =
+
CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
+ if
(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
+
KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
+ }
+ } else {
+ log.error(String.format("Cluster state
transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s
[current-status] %s [status-requested] %s",
+ event.getClusterId(),
event.getInstanceId(), context.getStatus(), status));
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
}