updating the monitors with the status changes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b9467d60 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b9467d60 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b9467d60 Branch: refs/heads/docker-grouping-merge Commit: b9467d60623fa85c1fa4087f4dded3d5c5c99b61 Parents: 744ac99 Author: reka <[email protected]> Authored: Mon Nov 3 12:55:48 2014 +0530 Committer: reka <[email protected]> Committed: Tue Nov 4 11:13:13 2014 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 311 +++++++++++++------ 1 file changed, 214 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/b9467d60/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index f47142d..0e2a4f9 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -19,22 +19,13 @@ package org.apache.stratos.autoscaler.message.receiver.topology; -import java.util.List; -import java.util.Set; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; -import org.apache.stratos.autoscaler.MemberStatsContext; import org.apache.stratos.autoscaler.NetworkPartitionContext; -import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; -import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; -import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; -import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.DependencyBuilderException; -import org.apache.stratos.autoscaler.exception.TerminationException; import org.apache.stratos.autoscaler.exception.TopologyInConsistentException; import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.grouping.topic.InstanceNotificationPublisher; @@ -42,15 +33,11 @@ import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; -import org.apache.stratos.autoscaler.partition.PartitionManager; -import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.autoscaler.status.checker.StatusChecker; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; @@ -76,6 +63,8 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.rule.FactHandle; +import java.util.Set; + /** * Autoscaler topology receiver. */ @@ -325,28 +314,156 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } }); + topologyEventReceiver.addEventListener(new ApplicationUndeployedEventListener() { + @Override + protected void onEvent(Event event) { + + log.info("[ApplicationUndeployedEvent] Received: " + event.getClass()); + + ApplicationUndeployedEvent applicationUndeployedEvent = (ApplicationUndeployedEvent) event; + + ApplicationMonitor appMonitor = AutoscalerContext.getInstance(). + getAppMonitor(applicationUndeployedEvent.getApplicationId()); + + // if any of Cluster Monitors are not added yet, should send the + // Cluster Terminated event for those clusters + Set<ClusterDataHolder> clusterDataHolders = applicationUndeployedEvent.getClusterData(); + if (clusterDataHolders != null) { + for (ClusterDataHolder clusterDataHolder : clusterDataHolders) { + VMClusterMonitor clusterMonitor = + ((VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterDataHolder.getClusterId())); + if (clusterMonitor == null) { + // Cluster Monitor not found; send Cluster Terminated event to cleanup + ClusterStatusEventPublisher.sendClusterTerminatedEvent( + applicationUndeployedEvent.getApplicationId(), + clusterDataHolder.getServiceType(), + clusterDataHolder.getClusterId()); + } else { + // if the Cluster Monitor exists, mark it as destroyed to stop it from spawning + // more instances + clusterMonitor.setDestroyed(true); + } + } + } + + if (appMonitor != null) { + // set Application Monitor state to 'Terminating' + appMonitor.setStatus(ApplicationStatus.Terminating); + + } else { + // ApplicationMonitor is not found, send Terminating event to clean up + ApplicationsEventPublisher.sendApplicationTerminatedEvent( + applicationUndeployedEvent.getApplicationId(), applicationUndeployedEvent.getClusterData()); + } + +// ApplicationUndeployedEvent applicationUndeployedEvent = (ApplicationUndeployedEvent) event; +// +// // acquire reead locks for application and relevant clusters +// TopologyManager.acquireReadLockForApplication(applicationUndeployedEvent.getApplicationId()); +// Set<ClusterDataHolder> clusterDataHolders = applicationUndeployedEvent.getClusterData(); +// if (clusterDataHolders != null) { +// for (ClusterDataHolder clusterData : clusterDataHolders) { +// TopologyManager.acquireReadLockForCluster(clusterData.getServiceType(), +// clusterData.getClusterId()); +// } +// } +// +// try { +// ApplicationMonitor appMonitor = AutoscalerContext.getInstance(). +// getAppMonitor(applicationUndeployedEvent.getApplicationId()); +// +// if (appMonitor != null) { +// // update the status as Terminating +// appMonitor.setStatus(ApplicationStatus.Terminating); +// +//// List<String> clusters = appMonitor. +//// findClustersOfApplication(applicationUndeployedEvent.getApplicationId()); +// +// boolean clusterMonitorsFound = false; +// for (ClusterDataHolder clusterData : clusterDataHolders) { +// //stopping the cluster monitor and remove it from the AS +// ClusterMonitor clusterMonitor = +// ((ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterData.getClusterId())); +// if (clusterMonitor != null) { +// clusterMonitorsFound = true; +// clusterMonitor.setDestroyed(true); +// //clusterMonitor.terminateAllMembers(); +// if (clusterMonitor.getStatus() == ClusterStatus.Active) { +// // terminated gracefully +// clusterMonitor.setStatus(ClusterStatus.Terminating); +// InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterData.getClusterId()); +// } else { +// // if not active, forcefully terminate +// clusterMonitor.setStatus(ClusterStatus.Terminating); +// clusterMonitor.terminateAllMembers(); +//// try { +//// // TODO: introduce a task to do this cleanup +//// CloudControllerClient.getInstance().terminateAllInstances(clusterData.getClusterId()); +//// } catch (TerminationException e) { +//// log.error("Unable to terminate instances for [ cluster id ] " + +//// clusterData.getClusterId(), e); +//// } +// } +// } else { +// log.warn("No Cluster Monitor found for cluster id " + clusterData.getClusterId()); +// // if Cluster Monitor is not found, still the Cluster Terminated +// // should be sent to update the parent Monitor +// StatusEventPublisher.sendClusterTerminatedEvent( +// applicationUndeployedEvent.getApplicationId(), +// clusterData.getServiceType(), clusterData.getClusterId()); +// } +// } +// +// // if by any chance, the cluster monitors have failed, we still need to undeploy this application +// // hence, check if the Cluster Monitors are not found and send the Application Terminated event +// if (!clusterMonitorsFound) { +// StatusEventPublisher.sendApplicationTerminatedEvent( +// applicationUndeployedEvent.getApplicationId(), clusterDataHolders); +// } +// +// } else { +// log.warn("Application Monitor cannot be found for the undeployed [application] " +// + applicationUndeployedEvent.getApplicationId()); +// // send the App Terminated event to cleanup +// StatusEventPublisher.sendApplicationTerminatedEvent( +// applicationUndeployedEvent.getApplicationId(), clusterDataHolders); +// } +// +// } finally { +// if (clusterDataHolders != null) { +// for (ClusterDataHolder clusterData : clusterDataHolders) { +// TopologyManager.releaseReadLockForCluster(clusterData.getServiceType(), +// clusterData.getClusterId()); +// } +// } +// TopologyManager. +// releaseReadLockForApplication(applicationUndeployedEvent.getApplicationId()); +// } + } + }); + topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event; - String clusterId = memberReadyToShutdownEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } + @Override + protected void onEvent(Event event) { + try { + MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event; + String clusterId = memberReadyToShutdownEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } }); //TODO delete this if we don't want this // topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { @@ -423,75 +540,75 @@ public class AutoscalerTopologyEventReceiver implements Runnable { }); topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; - String clusterId = memberTerminatedEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberTerminatedEvent(memberTerminatedEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } + @Override + protected void onEvent(Event event) { + try { + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + String clusterId = memberTerminatedEvent.getClusterId(); + AbstractClusterMonitor monitor; + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberTerminatedEvent(memberTerminatedEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } }); topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; - String clusterId = memberActivatedEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberActivatedEvent(memberActivatedEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } + @Override + protected void onEvent(Event event) { + try { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + String clusterId = memberActivatedEvent.getClusterId(); + AbstractClusterMonitor monitor; + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberActivatedEvent(memberActivatedEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } }); topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { - @Override - protected void onEvent(Event event) { - try { - MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event; - String clusterId = maintenanceModeEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } + @Override + protected void onEvent(Event event) { + try { + MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event; + String clusterId = maintenanceModeEvent.getClusterId(); + AbstractClusterMonitor monitor; + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } }); }
