updating the monitors with the status changes

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b9467d60
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b9467d60
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b9467d60

Branch: refs/heads/docker-grouping-merge
Commit: b9467d60623fa85c1fa4087f4dded3d5c5c99b61
Parents: 744ac99
Author: reka <[email protected]>
Authored: Mon Nov 3 12:55:48 2014 +0530
Committer: reka <[email protected]>
Committed: Tue Nov 4 11:13:13 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 311 +++++++++++++------
 1 file changed, 214 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b9467d60/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index f47142d..0e2a4f9 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -19,22 +19,13 @@
 
 package org.apache.stratos.autoscaler.message.receiver.topology;
 
-import java.util.List;
-import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
 import org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import 
org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
-import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
 import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
 import 
org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher;
 import 
org.apache.stratos.autoscaler.grouping.topic.InstanceNotificationPublisher;
@@ -42,15 +33,11 @@ import 
org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
 import 
org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.status.checker.StatusChecker;
 import org.apache.stratos.messaging.domain.applications.Application;
 import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
 import org.apache.stratos.messaging.domain.applications.Applications;
 import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
@@ -76,6 +63,8 @@ import 
org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.drools.runtime.StatefulKnowledgeSession;
 import org.drools.runtime.rule.FactHandle;
 
+import java.util.Set;
+
 /**
  * Autoscaler topology receiver.
  */
@@ -325,28 +314,156 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
             }
         });
 
+        topologyEventReceiver.addEventListener(new 
ApplicationUndeployedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ApplicationUndeployedEvent] Received: " + 
event.getClass());
+
+                ApplicationUndeployedEvent applicationUndeployedEvent = 
(ApplicationUndeployedEvent) event;
+
+                ApplicationMonitor appMonitor = 
AutoscalerContext.getInstance().
+                        
getAppMonitor(applicationUndeployedEvent.getApplicationId());
+
+                // if any of Cluster Monitors are not added yet, should send 
the
+                // Cluster Terminated event for those clusters
+                Set<ClusterDataHolder> clusterDataHolders = 
applicationUndeployedEvent.getClusterData();
+                if (clusterDataHolders != null) {
+                    for (ClusterDataHolder clusterDataHolder : 
clusterDataHolders) {
+                        VMClusterMonitor clusterMonitor =
+                                ((VMClusterMonitor) 
AutoscalerContext.getInstance().getClusterMonitor(clusterDataHolder.getClusterId()));
+                        if (clusterMonitor == null) {
+                            // Cluster Monitor not found; send Cluster 
Terminated event to cleanup
+                            
ClusterStatusEventPublisher.sendClusterTerminatedEvent(
+                                    
applicationUndeployedEvent.getApplicationId(),
+                                    clusterDataHolder.getServiceType(),
+                                    clusterDataHolder.getClusterId());
+                        } else {
+                            // if the Cluster Monitor exists, mark it as 
destroyed to stop it from spawning
+                            // more instances
+                            clusterMonitor.setDestroyed(true);
+                        }
+                    }
+                }
+
+                if (appMonitor != null) {
+                    // set Application Monitor state to 'Terminating'
+                    appMonitor.setStatus(ApplicationStatus.Terminating);
+
+                } else {
+                    // ApplicationMonitor is not found, send Terminating event 
to clean up
+                    ApplicationsEventPublisher.sendApplicationTerminatedEvent(
+                            applicationUndeployedEvent.getApplicationId(), 
applicationUndeployedEvent.getClusterData());
+                }
+
+//                ApplicationUndeployedEvent applicationUndeployedEvent = 
(ApplicationUndeployedEvent) event;
+//
+//                // acquire reead locks for application and relevant clusters
+//                
TopologyManager.acquireReadLockForApplication(applicationUndeployedEvent.getApplicationId());
+//                Set<ClusterDataHolder> clusterDataHolders = 
applicationUndeployedEvent.getClusterData();
+//                if (clusterDataHolders != null) {
+//                    for (ClusterDataHolder clusterData : clusterDataHolders) 
{
+//                        
TopologyManager.acquireReadLockForCluster(clusterData.getServiceType(),
+//                                clusterData.getClusterId());
+//                    }
+//                }
+//
+//                try {
+//                    ApplicationMonitor appMonitor = 
AutoscalerContext.getInstance().
+//                            
getAppMonitor(applicationUndeployedEvent.getApplicationId());
+//
+//                    if (appMonitor != null) {
+//                        // update the status as Terminating
+//                        appMonitor.setStatus(ApplicationStatus.Terminating);
+//
+////                        List<String> clusters = appMonitor.
+////                                
findClustersOfApplication(applicationUndeployedEvent.getApplicationId());
+//
+//                        boolean clusterMonitorsFound = false;
+//                        for (ClusterDataHolder clusterData : 
clusterDataHolders) {
+//                            //stopping the cluster monitor and remove it 
from the AS
+//                            ClusterMonitor clusterMonitor =
+//                                    ((ClusterMonitor) 
AutoscalerContext.getInstance().getMonitor(clusterData.getClusterId()));
+//                            if (clusterMonitor != null) {
+//                                clusterMonitorsFound = true;
+//                                clusterMonitor.setDestroyed(true);
+//                                //clusterMonitor.terminateAllMembers();
+//                                if (clusterMonitor.getStatus() == 
ClusterStatus.Active) {
+//                                    // terminated gracefully
+//                                    
clusterMonitor.setStatus(ClusterStatus.Terminating);
+//                                    
InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterData.getClusterId());
+//                                } else {
+//                                    // if not active, forcefully terminate
+//                                    
clusterMonitor.setStatus(ClusterStatus.Terminating);
+//                                    clusterMonitor.terminateAllMembers();
+////                                    try {
+////                                        // TODO: introduce a task to do 
this cleanup
+////                                        
CloudControllerClient.getInstance().terminateAllInstances(clusterData.getClusterId());
+////                                    } catch (TerminationException e) {
+////                                        log.error("Unable to terminate 
instances for [ cluster id ] " +
+////                                                
clusterData.getClusterId(), e);
+////                                    }
+//                                }
+//                            } else {
+//                                log.warn("No Cluster Monitor found for 
cluster id " + clusterData.getClusterId());
+//                                // if Cluster Monitor is not found, still 
the Cluster Terminated
+//                                // should be sent to update the parent 
Monitor
+//                                
StatusEventPublisher.sendClusterTerminatedEvent(
+//                                        
applicationUndeployedEvent.getApplicationId(),
+//                                        clusterData.getServiceType(), 
clusterData.getClusterId());
+//                            }
+//                        }
+//
+//                        // if by any chance, the cluster monitors have 
failed, we still need to undeploy this application
+//                        // hence, check if the Cluster Monitors are not 
found and send the Application Terminated event
+//                        if (!clusterMonitorsFound) {
+//                            
StatusEventPublisher.sendApplicationTerminatedEvent(
+//                                    
applicationUndeployedEvent.getApplicationId(), clusterDataHolders);
+//                        }
+//
+//                    } else {
+//                        log.warn("Application Monitor cannot be found for 
the undeployed [application] "
+//                                + 
applicationUndeployedEvent.getApplicationId());
+//                        // send the App Terminated event to cleanup
+//                        StatusEventPublisher.sendApplicationTerminatedEvent(
+//                                
applicationUndeployedEvent.getApplicationId(), clusterDataHolders);
+//                    }
+//
+//                } finally {
+//                    if (clusterDataHolders != null) {
+//                        for (ClusterDataHolder clusterData : 
clusterDataHolders) {
+//                            
TopologyManager.releaseReadLockForCluster(clusterData.getServiceType(),
+//                                    clusterData.getClusterId());
+//                        }
+//                    }
+//                    TopologyManager.
+//                            
releaseReadLockForApplication(applicationUndeployedEvent.getApplicationId());
+//                }
+            }
+        });
+
         topologyEventReceiver.addEventListener(new 
MemberReadyToShutdownEventListener() {
-               @Override
-               protected void onEvent(Event event) {
-                       try {
-                               MemberReadyToShutdownEvent 
memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
-                               String clusterId = 
memberReadyToShutdownEvent.getClusterId();
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               AbstractClusterMonitor monitor;
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                               + "[cluster] 
%s", clusterId));
-                                       }
-                                       return;
-                               }
-                               
monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-                       } catch (Exception e) {
-                               String msg = "Error processing event " + 
e.getLocalizedMessage();
-                               log.error(msg, e);
-                       }
-               }
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = 
(MemberReadyToShutdownEvent) event;
+                    String clusterId = 
memberReadyToShutdownEvent.getClusterId();
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    AbstractClusterMonitor monitor;
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
+                                    + "[cluster] %s", clusterId));
+                        }
+                        return;
+                    }
+                    
monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
+                }
+            }
         });
 //TODO delete this if we don't want this
 //        topologyEventReceiver.addEventListener(new 
ClusterRemovedEventListener() {
@@ -423,75 +540,75 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         });
 
         topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
-               @Override
-               protected void onEvent(Event event) {
-                       try {
-                               MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
-                               String clusterId = 
memberTerminatedEvent.getClusterId();
-                               AbstractClusterMonitor monitor;
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                               + "[cluster] 
%s", clusterId));
-                                       }
-                                       return;
-                               }
-                               
monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
-                       } catch (Exception e) {
-                               String msg = "Error processing event " + 
e.getLocalizedMessage();
-                               log.error(msg, e);
-                       }
-               }
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
+                    String clusterId = memberTerminatedEvent.getClusterId();
+                    AbstractClusterMonitor monitor;
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
+                                    + "[cluster] %s", clusterId));
+                        }
+                        return;
+                    }
+                    monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
+                }
+            }
         });
 
         topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
-               @Override
-               protected void onEvent(Event event) {
-                       try {
-                               MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
-                               String clusterId = 
memberActivatedEvent.getClusterId();
-                               AbstractClusterMonitor monitor;
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                               + "[cluster] 
%s", clusterId));
-                                       }
-                                       return;
-                               }
-                               
monitor.handleMemberActivatedEvent(memberActivatedEvent);
-                       } catch (Exception e) {
-                               String msg = "Error processing event " + 
e.getLocalizedMessage();
-                               log.error(msg, e);
-                       }
-               }
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
+                    String clusterId = memberActivatedEvent.getClusterId();
+                    AbstractClusterMonitor monitor;
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
+                                    + "[cluster] %s", clusterId));
+                        }
+                        return;
+                    }
+                    monitor.handleMemberActivatedEvent(memberActivatedEvent);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
+                }
+            }
         });
 
         topologyEventReceiver.addEventListener(new MemberMaintenanceListener() 
{
-               @Override
-               protected void onEvent(Event event) {
-                       try {
-                               MemberMaintenanceModeEvent maintenanceModeEvent 
= (MemberMaintenanceModeEvent) event;
-                               String clusterId = 
maintenanceModeEvent.getClusterId();
-                               AbstractClusterMonitor monitor;
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                               + "[cluster] 
%s", clusterId));
-                                       }
-                                       return;
-                               }
-                               
monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
-                       } catch (Exception e) {
-                               String msg = "Error processing event " + 
e.getLocalizedMessage();
-                               log.error(msg, e);
-                       }
-               }
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    MemberMaintenanceModeEvent maintenanceModeEvent = 
(MemberMaintenanceModeEvent) event;
+                    String clusterId = maintenanceModeEvent.getClusterId();
+                    AbstractClusterMonitor monitor;
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
+                                    + "[cluster] %s", clusterId));
+                        }
+                        return;
+                    }
+                    
monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
+                }
+            }
         });
     }
 

Reply via email to