http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1603aef..e857eaf 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -19,29 +19,16 @@
 
 package org.apache.stratos.autoscaler.message.receiver.topology;
 
-import java.util.List;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.PartitionValidationException;
 import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
 import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
@@ -112,7 +99,6 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
                     TopologyManager.acquireReadLock();
                     for (Service service : 
TopologyManager.getTopology().getServices()) {
@@ -121,167 +107,108 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                         }
                     }
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
-
         });
 
         topologyEventReceiver.addEventListener(new 
MemberReadyToShutdownEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = 
(MemberReadyToShutdownEvent)event;
+                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = 
(MemberReadyToShutdownEvent) event;
+                    String clusterId = 
memberReadyToShutdownEvent.getClusterId();
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
                     AbstractClusterMonitor monitor;
-                    String clusterId = 
memberReadyToShutdownEvent.getClusterId();
-                    String memberId = memberReadyToShutdownEvent.getMemberId();
-
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                       + "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", 
clusterId));
                         }
                         return;
                     }
-                    
-                    TopologyManager.acquireReadLock();
-                    
-                    if(monitor.getClusterType() == 
ClusterType.VMServiceCluster 
-                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
-                       
-                        NetworkPartitionContext nwPartitionCtxt;
-                        String networkPartitionId = 
memberReadyToShutdownEvent.getNetworkPartitionId();
-                                               nwPartitionCtxt = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                        // start a new member in the same Partition
-                        String partitionId = ((VMClusterMonitor) 
monitor).getPartitionOfMember(memberId);
-                        PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
-                        // terminate the shutdown ready member
-                        CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
-                        ccClient.terminate(memberId);
-
-                        // remove from active member list
-                        partitionCtxt.removeActiveMemberById(memberId);
-                        
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member is terminated and 
removed from the active members list: "
-                                       + "[member] %s [partition] %s [cluster] 
%s ", memberId, partitionId, clusterId));
-                        }
-                    } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
-                       // no need to do anything
-                    }
+                    
monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
+                }
+            }
+        });
 
-                } catch (TerminationException e) {
-                    log.error(e);
+        topologyEventReceiver.addEventListener(new 
ClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    log.info("Event received: " + event);
+                    ClusterCreatedEvent clusterCreatedEvent = 
(ClusterCreatedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    Service service = 
TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName());
+                    Cluster cluster = 
service.getCluster(clusterCreatedEvent.getClusterId());
+                    startClusterMonitor(cluster);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
         });
 
-        topologyEventReceiver.addEventListener(new 
ClusterCreatedEventListener() {
-                    @Override
-                    protected void onEvent(Event event) {
-                        try {
-                            log.info("Event received: " + event);
-                            ClusterCreatedEvent e = (ClusterCreatedEvent) 
event;
-                            TopologyManager.acquireReadLock();
-                            Service service = 
TopologyManager.getTopology().getService(e.getServiceName());
-                            Cluster cluster = 
service.getCluster(e.getClusterId());
-                            startClusterMonitor(cluster);
-                        } catch (Exception e) {
-                            log.error("Error processing event", e);
-                        } finally {
-                            TopologyManager.releaseReadLock();
-                        }
-                    }
-
-                });
-
         topologyEventReceiver.addEventListener(new 
ClusterMaintenanceModeEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
                     log.info("Event received: " + event);
-                    ClusterMaintenanceModeEvent e = 
(ClusterMaintenanceModeEvent) event;
+                    ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = 
(ClusterMaintenanceModeEvent) event;
                     TopologyManager.acquireReadLock();
-                    Service service = 
TopologyManager.getTopology().getService(e.getServiceName());
-                    Cluster cluster = service.getCluster(e.getClusterId());
-                    
if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) 
{
-                       
AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
-                    } else {
+                    Service service = 
TopologyManager.getTopology().getService(clusterMaintenanceModeEvent.getServiceName());
+                    Cluster cluster = 
service.getCluster(clusterMaintenanceModeEvent.getClusterId());
+                    AbstractClusterMonitor monitor;
+                    monitor = 
AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+                    if (null == monitor) {
                         log.error("cluster monitor not exists for the cluster: 
" + cluster.toString());
+                        return;
                     }
+                    monitor.setStatus(clusterMaintenanceModeEvent.getStatus());
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
-                });
+        });
 
         topologyEventReceiver.addEventListener(new 
ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
-                    TopologyManager.acquireReadLock();
-
-                    String clusterId = e.getClusterId();
-                    String deploymentPolicy = e.getDeploymentPolicy();
-
-                    AbstractClusterMonitor monitor = null;
-
-                    if (e.isLbCluster()) {
-                        DeploymentPolicy depPolicy = 
PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
-                        if (depPolicy != null) {
-                            List<NetworkPartitionLbHolder> lbHolders = 
PartitionManager.getInstance()
-                                    .getNetworkPartitionLbHolders(depPolicy);
-
-                            for (NetworkPartitionLbHolder 
networkPartitionLbHolder : lbHolders) {
-                                // removes lb cluster ids
-                                boolean isRemoved = 
networkPartitionLbHolder.removeLbClusterId(clusterId);
-                                if (isRemoved) {
-                                    log.info("Removed the lb cluster [id]:"
-                                            + clusterId
-                                            + " reference from Network 
Partition [id]: "
-                                            + networkPartitionLbHolder
-                                            .getNetworkPartitionId());
-
-                                }
-                                if (log.isDebugEnabled()) {
-                                    log.debug(networkPartitionLbHolder);
-                                }
-
-                            }
+                    ClusterRemovedEvent clusterRemovedEvent = 
(ClusterRemovedEvent) event;
+                    String clusterId = clusterRemovedEvent.getClusterId();
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    AbstractClusterMonitor monitor;
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
+                                                    + "[cluster] %s", 
clusterId));
                         }
+                        return;
                     }
-                    
-                    monitor = 
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);                
               
-
-                    // runTerminateAllRule(monitor);
-                    if (monitor != null) {
-                        monitor.destroy();
-                        log.info(String.format("Cluster monitor has been 
removed successfully: [cluster] %s ",
-                                clusterId));
-                    }
+                    monitor.handleClusterRemovedEvent(clusterRemovedEvent);
+                    asCtx.removeClusterMonitor(clusterId);
+                    monitor.destroy();
+                    log.info(String.format("Cluster monitor has been removed 
successfully: [cluster] %s ",
+                                           clusterId));
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
-
         });
 
         topologyEventReceiver.addEventListener(new 
MemberStartedEventListener() {
@@ -295,70 +222,23 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberTerminatedEvent e = (MemberTerminatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    String partitionId = e.getPartitionId();
-                    String memberId = e.getMemberId();
+                    MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
+                    String clusterId = memberTerminatedEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                       + "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", 
clusterId));
                         }
                         return;
                     }
-                    
-                    if(monitor.getClusterType() == 
ClusterType.VMServiceCluster 
-                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
-                       
-                        NetworkPartitionContext networkPartitionContext = 
-                                       ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                        PartitionContext partitionContext = 
networkPartitionContext.getPartitionCtxt(partitionId);
-                        partitionContext.removeMemberStatsContext(memberId);
-
-                        if 
(partitionContext.removeTerminationPendingMember(memberId)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Member is removed 
from termination pending members list: "
-                                               + "[member] %s", memberId));
-                            }
-                        } else if 
(partitionContext.removePendingMember(memberId)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Member is removed 
from pending members list: "
-                                               + "[member] %s", memberId));
-                            }
-                        } else if 
(partitionContext.removeActiveMemberById(memberId)) {
-                            log.warn(String.format("Member is in the wrong 
list and it is removed from "
-                                       + "active members list", memberId));
-                        } else if 
(partitionContext.removeObsoleteMember(memberId)){
-                               log.warn(String.format("Member's obsolated 
timeout has been expired and "
-                                               + "it is removed from obsolated 
members list", memberId));
-                        } else {
-                            log.warn(String.format("Member is not available in 
any of the list active, "
-                                       + "pending and termination pending", 
memberId));
-                        }
-
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has 
been removed successfully: "
-                                       + "[member] %s", memberId));
-                        }
-                    } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
-                       // no need to do anything
-                    }
-                    
+                    monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
 
@@ -367,160 +247,47 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-
-                    MemberActivatedEvent e = (MemberActivatedEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
-
-                    String clusterId = e.getClusterId();
+                    MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
+                    String clusterId = memberActivatedEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                       + "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", 
clusterId));
                         }
                         return;
                     }
-                    
-                    if (monitor.getClusterType() == 
ClusterType.VMServiceCluster 
-                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {    
-                       PartitionContext partitionContext;
-                        partitionContext = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                        partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has 
been added successfully: "
-                                       + "[member] %s", memberId));
-                        }
-                        
partitionContext.movePendingMemberToActiveMembers(memberId);
-                                       } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
-                                               KubernetesClusterContext 
kubernetesClusterContext;
-                                               kubernetesClusterContext = 
((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
-                                               
kubernetesClusterContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has 
been added successfully: "
-                                       + "[member] %s", memberId));
-                        }
-                                               
kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
-                                       }
-                    
+                    monitor.handleMemberActivatedEvent(memberActivatedEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
         });
 
-        topologyEventReceiver.addEventListener(new 
MemberReadyToShutdownEventListener() {
-           @Override
-           protected void onEvent(Event event) {
-               try {
-                  TopologyManager.acquireReadLock();
-                  
-                   MemberReadyToShutdownEvent memberReadyToShutdownEvent = 
(MemberReadyToShutdownEvent)event;
-                   AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                   AbstractClusterMonitor monitor;
-                   String clusterId = 
memberReadyToShutdownEvent.getClusterId();
-                   String memberId = memberReadyToShutdownEvent.getMemberId();
-
-                   if(asCtx.clusterMonitorExist(clusterId)) {
-                       monitor = asCtx.getClusterMonitor(clusterId);
-                   } else {
-                       if(log.isDebugEnabled()){
-                           log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                       + "[cluster] %s", clusterId));
-                       }
-                       return;
-                   }
-
-                   if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                                  || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
-                          
-                       NetworkPartitionContext nwPartitionCtxt;
-                       String networkPartitionId = 
memberReadyToShutdownEvent.getNetworkPartitionId();
-                       nwPartitionCtxt = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                       // start a new member in the same Partition
-                       String partitionId = ((VMClusterMonitor) 
monitor).getPartitionOfMember(memberId);
-                       PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
-                       // terminate the shutdown ready member
-                       CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
-                       ccClient.terminate(memberId);
-
-                       // remove from active member list
-                       partitionCtxt.removeActiveMemberById(memberId);
-
-                       if (log.isInfoEnabled()) {
-                           log.info(String.format("Member is terminated and 
removed from the active members list: "
-                                       + "[member] %s [partition] %s [cluster] 
%s ", memberId, partitionId, clusterId));
-                       }
-                   } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
-                          // no need to do anything
-                   }
-
-               } catch (TerminationException e) {
-                   log.error(e);
-               }
-           }
-
-       });
-
-
         topologyEventReceiver.addEventListener(new MemberMaintenanceListener() 
{
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-
-                    MemberMaintenanceModeEvent e = 
(MemberMaintenanceModeEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
-
-                    String clusterId = e.getClusterId();
+                    MemberMaintenanceModeEvent maintenanceModeEvent = 
(MemberMaintenanceModeEvent) event;
+                    String clusterId = maintenanceModeEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    if (asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = 
AutoscalerContext.getInstance().getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                       + "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", 
clusterId));
                         }
                         return;
                     }
-                    
-                    if(monitor.getClusterType() == 
ClusterType.VMServiceCluster 
-                                  || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
-                       
-                       PartitionContext partitionContext;
-                       partitionContext = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                        partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Member has been moved as 
pending termination: "
-                                       + "[member] %s", memberId));
-                        }
-                        
partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
-                    } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
-                       // no need to do anything
-                    }
-
+                    
monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
         });
@@ -529,27 +296,14 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
-//                try {
-//                    TopologyManager.acquireReadLock();
-//
-//                    // Remove all clusters of given service from context
-//                    ServiceRemovedEvent serviceRemovedEvent = 
(ServiceRemovedEvent)event;
-//                    for(Service service : 
TopologyManager.getTopology().getServices()) {
-//                        for(Cluster cluster : service.getClusters()) {
-//                            removeMonitor(cluster.getHostName());
-//                        }
-//                    }
-//                }
-//                finally {
-//                    TopologyManager.releaseReadLock();
-//                }
+
             }
         });
     }
 
     private class ClusterMonitorAdder implements Runnable {
         private Cluster cluster;
-        private String clusterMonitorType;
+
         public ClusterMonitorAdder(Cluster cluster) {
             this.cluster = cluster;
         }
@@ -567,38 +321,41 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 try {
                     monitor = ClusterMonitorFactory.getMonitor(cluster);
                     success = true;
-                    clusterMonitorType = monitor.getClusterType().name();
                 } catch (PolicyValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: 
" + cluster.getClusterId();
-                    log.debug(msg, e);
+                    if (log.isDebugEnabled()) {
+                        String msg = "Cluster monitor creation failed for 
cluster: " + cluster.getClusterId();
+                        log.debug(msg, e);
+                    }
                     retries--;
-
                 } catch (PartitionValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: 
" + cluster.getClusterId();
-                    log.debug(msg, e);
+                    if (log.isDebugEnabled()) {
+                        String msg = "Cluster monitor creation failed for 
cluster: " + cluster.getClusterId();
+                        log.debug(msg, e);
+                    }
                     retries--;
                 }
             } while (!success && retries != 0);
 
             if (monitor == null) {
                 String msg = "Cluster monitor creation failed, even after 
retrying for 5 times, "
-                        + "for cluster: " + cluster.getClusterId();
+                             + "for cluster: " + cluster.getClusterId();
                 log.error(msg);
                 throw new RuntimeException(msg);
             }
-
+            //TODO  private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+            //         scheduler.scheduleAtFixedRate(monitor, 0, 
getMonitorInterval(), TimeUnit.MILLISECONDS);
             Thread th = new Thread(monitor);
             th.start();
             AutoscalerContext.getInstance().addClusterMonitor(monitor);
             if (log.isInfoEnabled()) {
-                log.info(String.format("%s monitor has been added 
successfully: [cluster] %s",
-                        clusterMonitorType, cluster.getClusterId()));
+                log.info(String.format("Cluster monitor has been added 
successfully: [cluster] %s",
+                                       cluster.getClusterId()));
             }
         }
     }
- 
+
     @SuppressWarnings("unused")
-       private void runTerminateAllRule(VMClusterMonitor monitor) {
+    private void runTerminateAllRule(VMClusterMonitor monitor) {
 
         FactHandle terminateAllFactHandle = null;
 
@@ -621,9 +378,13 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 
     protected synchronized void startClusterMonitor(Cluster cluster) {
         Thread th = null;
-        if 
(!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
-               th = new Thread(new ClusterMonitorAdder(cluster));
-        } 
+
+        AbstractClusterMonitor monitor;
+        monitor = 
AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+
+        if (null == monitor) {
+            th = new Thread(new ClusterMonitorAdder(cluster));
+        }
         if (th != null) {
             th.start();
             try {
@@ -632,9 +393,8 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
             }
 
             if (log.isDebugEnabled()) {
-                log.debug(String
-                        .format("Cluster monitor thread has been started 
successfully: [cluster] %s ",
-                                cluster.getClusterId()));
+                log.debug(String.format("Cluster monitor thread has been 
started successfully: "
+                                        + "[cluster] %s ", 
cluster.getClusterId()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index cb60027..6061c3b 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -19,130 +19,211 @@
 package org.apache.stratos.autoscaler.monitor;
 
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import 
org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import 
org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import 
org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import 
org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import 
org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import 
org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import 
org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import 
org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import 
org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import 
org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import 
org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import 
org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import 
org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import 
org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
 import org.drools.runtime.StatefulKnowledgeSession;
 import org.drools.runtime.rule.FactHandle;
 
 /*
  * Every cluster monitor, which are monitoring a cluster, should extend this 
class.
  */
-public abstract class AbstractClusterMonitor implements Runnable{
-       
+public abstract class AbstractClusterMonitor implements Runnable {
+
     private String clusterId;
     private String serviceId;
-    private ClusterType clusterType;
-       private ClusterStatus status;
-       private int monitorInterval;
-       
-       protected FactHandle minCheckFactHandle;
-       protected FactHandle scaleCheckFactHandle;
-       private StatefulKnowledgeSession minCheckKnowledgeSession;
-       private StatefulKnowledgeSession scaleCheckKnowledgeSession;
-       private boolean isDestroyed;
-       
-       private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-       
-       protected AbstractClusterMonitor(String clusterId, String serviceId, 
ClusterType clusterType, 
-                       AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-               
-               super();
-               this.clusterId = clusterId;
-               this.serviceId = serviceId;
-               this.clusterType = clusterType;
-               this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+    private ClusterStatus status;
+    private int monitoringIntervalMilliseconds;
+
+    protected FactHandle minCheckFactHandle;
+    protected FactHandle scaleCheckFactHandle;
+    private StatefulKnowledgeSession minCheckKnowledgeSession;
+    private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+    private boolean isDestroyed;
+
+    private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+
+    protected AbstractClusterMonitor(String clusterId, String serviceId,
+                                     AutoscalerRuleEvaluator 
autoscalerRuleEvaluator) {
+
+        super();
+        this.clusterId = clusterId;
+        this.serviceId = serviceId;
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
         this.scaleCheckKnowledgeSession = 
autoscalerRuleEvaluator.getScaleCheckStatefulSession();
         this.minCheckKnowledgeSession = 
autoscalerRuleEvaluator.getMinCheckStatefulSession();
-       }
+    }
+
+    protected abstract void readConfigurations();
+
+    protected abstract void monitor();
 
-       protected abstract void readConfigurations();
-       protected abstract void monitor();
     public abstract void destroy();
-    
-       public String getClusterId() {
-               return clusterId;
-       }
-       
-       public void setClusterId(String clusterId) {
-               this.clusterId = clusterId;
-       }
-       
-       public void setStatus(ClusterStatus status) {
-               this.status = status;
-       }
-
-       public ClusterType getClusterType() {
-               return clusterType;
-       }
-
-       public ClusterStatus getStatus() {
-               return status;
-       }
-       
-       public String getServiceId() {
-               return serviceId;
-       }
-       
-       public void setServiceId(String serviceId) {
-               this.serviceId = serviceId;
-       }
-       
-       public int getMonitorInterval() {
-               return monitorInterval;
-       }
-       
-       public void setMonitorInterval(int monitorInterval) {
-               this.monitorInterval = monitorInterval;
-       }
-
-       public FactHandle getMinCheckFactHandle() {
-               return minCheckFactHandle;
-       }
-       
-       public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
-               this.minCheckFactHandle = minCheckFactHandle;
-       }
-       
-       public FactHandle getScaleCheckFactHandle() {
-               return scaleCheckFactHandle;
-       }
-       
-       public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
-               this.scaleCheckFactHandle = scaleCheckFactHandle;
-       }
-       
-       public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
-               return minCheckKnowledgeSession;
-       }
-       
-       public void setMinCheckKnowledgeSession(
-                       StatefulKnowledgeSession minCheckKnowledgeSession) {
-               this.minCheckKnowledgeSession = minCheckKnowledgeSession;
-       }
-       
-       public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
-               return scaleCheckKnowledgeSession;
-       }
-       
-       public void setScaleCheckKnowledgeSession(
-                       StatefulKnowledgeSession scaleCheckKnowledgeSession) {
-               this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
-       }
-       
-       public boolean isDestroyed() {
-               return isDestroyed;
-       }
-       
-       public void setDestroyed(boolean isDestroyed) {
-               this.isDestroyed = isDestroyed;
-       }
-
-       public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
-               return autoscalerRuleEvaluator;
-       }
-
-       public void setAutoscalerRuleEvaluator(
-                       AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-               this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
-       }
+
+    //handle health events
+    public abstract void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent);
+
+    public abstract void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+    public abstract void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent 
secondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+    public abstract void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+    public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent 
secondDerivativeOfMemoryConsumptionEvent);
+
+    public abstract void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+    public abstract void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+    public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent 
secondDerivativeOfRequestsInFlightEvent);
+
+    public abstract void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent 
memberAverageMemoryConsumptionEvent);
+
+    public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent 
memberGradientOfMemoryConsumptionEvent);
+
+    public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent 
memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+    public abstract void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+    public abstract void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+    public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent 
memberSecondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleMemberFaultEvent(MemberFaultEvent 
memberFaultEvent);
+
+    //handle topology events
+    public abstract void handleMemberStartedEvent(MemberStartedEvent 
memberStartedEvent);
+
+    public abstract void handleMemberActivatedEvent(MemberActivatedEvent 
memberActivatedEvent);
+
+    public abstract void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent);
+
+    public abstract void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+    public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent 
memberTerminatedEvent);
+
+    public abstract void handleClusterRemovedEvent(ClusterRemovedEvent 
clusterRemovedEvent);
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public void setStatus(ClusterStatus status) {
+        this.status = status;
+    }
+
+    public ClusterStatus getStatus() {
+        return status;
+    }
+
+    public String getServiceId() {
+        return serviceId;
+    }
+
+    public void setServiceId(String serviceId) {
+        this.serviceId = serviceId;
+    }
+
+    public int getMonitorIntervalMilliseconds() {
+        return monitoringIntervalMilliseconds;
+    }
+
+    public void setMonitorIntervalMilliseconds(int 
monitorIntervalMilliseconds) {
+        this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+    }
+
+    public FactHandle getMinCheckFactHandle() {
+        return minCheckFactHandle;
+    }
+
+    public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+        this.minCheckFactHandle = minCheckFactHandle;
+    }
+
+    public FactHandle getScaleCheckFactHandle() {
+        return scaleCheckFactHandle;
+    }
+
+    public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+        this.scaleCheckFactHandle = scaleCheckFactHandle;
+    }
+
+    public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+        return minCheckKnowledgeSession;
+    }
+
+    public void setMinCheckKnowledgeSession(
+            StatefulKnowledgeSession minCheckKnowledgeSession) {
+        this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+    }
+
+    public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+        return scaleCheckKnowledgeSession;
+    }
+
+    public void setScaleCheckKnowledgeSession(
+            StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+        this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+    }
+
+    public boolean isDestroyed() {
+        return isDestroyed;
+    }
+
+    public void setDestroyed(boolean isDestroyed) {
+        this.isDestroyed = isDestroyed;
+    }
+
+    public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+        return autoscalerRuleEvaluator;
+    }
+
+    public void setAutoscalerRuleEvaluator(
+            AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
index bd01dc6..208e4ce 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
@@ -52,30 +52,32 @@ import org.apache.stratos.messaging.util.Constants;
  * Factory class for creating cluster monitors.
  */
 public class ClusterMonitorFactory {
-       
-       private static final Log log = 
LogFactory.getLog(ClusterMonitorFactory.class);
-
-       /**
-        * @param cluster the cluster to be monitored
-        * @return the created cluster monitor
-        * @throws PolicyValidationException when deployment policy is not valid
-        * @throws PartitionValidationException when partition is not valid
-        */
-       public static AbstractClusterMonitor getMonitor(Cluster cluster) throws 
PolicyValidationException, PartitionValidationException {
-               
-               AbstractClusterMonitor clusterMonitor;
-               if(cluster.isKubernetesCluster()){
-                       clusterMonitor = 
getDockerServiceClusterMonitor(cluster);
-               } else if (cluster.isLbCluster()){
-                       clusterMonitor = getVMLbClusterMonitor(cluster);
-               } else {
-                       clusterMonitor = getVMServiceClusterMonitor(cluster);
-               }
-               
-               return clusterMonitor;
-       }
-       
-    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster 
cluster) throws PolicyValidationException, PartitionValidationException {
+
+    private static final Log log = 
LogFactory.getLog(ClusterMonitorFactory.class);
+
+    /**
+     * @param cluster the cluster to be monitored
+     * @return the created cluster monitor
+     * @throws PolicyValidationException    when deployment policy is not valid
+     * @throws PartitionValidationException when partition is not valid
+     */
+    public static AbstractClusterMonitor getMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+
+        AbstractClusterMonitor clusterMonitor;
+        if (cluster.isKubernetesCluster()) {
+            clusterMonitor = getDockerServiceClusterMonitor(cluster);
+        } else if (cluster.isLbCluster()) {
+            clusterMonitor = getVMLbClusterMonitor(cluster);
+        } else {
+            clusterMonitor = getVMServiceClusterMonitor(cluster);
+        }
+
+        return clusterMonitor;
+    }
+
+    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster 
cluster)
+            throws PolicyValidationException, PartitionValidationException {
         // FIXME fix the following code to correctly update
         // AutoscalerContext context = AutoscalerContext.getInstance();
         if (null == cluster) {
@@ -91,11 +93,11 @@ public class ClusterMonitorFactory {
         }
 
         AutoscalePolicy policy =
-                                 PolicyManager.getInstance()
-                                              
.getAutoscalePolicy(autoscalePolicyName);
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
         DeploymentPolicy deploymentPolicy =
-                                            PolicyManager.getInstance()
-                                                         
.getDeploymentPolicy(deploymentPolicyName);
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
 
         if (deploymentPolicy == null) {
             String msg = "Deployment Policy is null. Policy name: " + 
deploymentPolicyName;
@@ -106,8 +108,8 @@ public class ClusterMonitorFactory {
         Partition[] allPartitions = deploymentPolicy.getAllPartitions();
         if (allPartitions == null) {
             String msg =
-                         "Deployment Policy's Partitions are null. Policy 
name: " +
-                                 deploymentPolicyName;
+                    "Deployment Policy's Partitions are null. Policy name: " +
+                    deploymentPolicyName;
             log.error(msg);
             throw new PolicyValidationException(msg);
         }
@@ -115,98 +117,100 @@ public class ClusterMonitorFactory {
         
CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(),
 deploymentPolicy);
 
         VMServiceClusterMonitor clusterMonitor =
-                                        new 
VMServiceClusterMonitor(cluster.getClusterId(),
-                                                           
cluster.getServiceName(),
-                                                           deploymentPolicy, 
policy);
+                new VMServiceClusterMonitor(cluster.getClusterId(),
+                                            cluster.getServiceName(),
+                                            deploymentPolicy, policy);
         clusterMonitor.setStatus(ClusterStatus.Created);
-        
-        for (PartitionGroup partitionGroup: 
deploymentPolicy.getPartitionGroups()){
+
+        for (PartitionGroup partitionGroup : 
deploymentPolicy.getPartitionGroups()) {
 
             NetworkPartitionContext networkPartitionContext = new 
NetworkPartitionContext(partitionGroup.getId(),
-                    partitionGroup.getPartitionAlgo(), 
partitionGroup.getPartitions());
+                                                                               
           partitionGroup.getPartitionAlgo(),
+                                                                               
           partitionGroup.getPartitions());
 
-            for(Partition partition: partitionGroup.getPartitions()){
+            for (Partition partition : partitionGroup.getPartitions()) {
                 PartitionContext partitionContext = new 
PartitionContext(partition);
                 partitionContext.setServiceName(cluster.getServiceName());
                 partitionContext.setProperties(cluster.getProperties());
                 partitionContext.setNetworkPartitionId(partitionGroup.getId());
-                
-                for (Member member: cluster.getMembers()){
+
+                for (Member member : cluster.getMembers()) {
                     String memberId = member.getMemberId();
-                    
if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
+                    if 
(member.getPartitionId().equalsIgnoreCase(partition.getId())) {
                         MemberContext memberContext = new MemberContext();
                         memberContext.setClusterId(member.getClusterId());
                         memberContext.setMemberId(memberId);
                         memberContext.setPartition(partition);
                         
memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-                        
-                        if(MemberStatus.Activated.equals(member.getStatus())){
+
+                        if (MemberStatus.Activated.equals(member.getStatus())) 
{
                             partitionContext.addActiveMember(memberContext);
 //                            
networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
 1);
 //                            
partitionContext.incrementCurrentActiveMemberCount(1);
 
-                        } else 
if(MemberStatus.Created.equals(member.getStatus()) || 
MemberStatus.Starting.equals(member.getStatus())){
+                        } else if 
(MemberStatus.Created.equals(member.getStatus()) || 
MemberStatus.Starting.equals(member.getStatus())) {
                             partitionContext.addPendingMember(memberContext);
 
 //                            
networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
 1);
-                        } else 
if(MemberStatus.Suspended.equals(member.getStatus())){
+                        } else if 
(MemberStatus.Suspended.equals(member.getStatus())) {
 //                            partitionContext.addFaultyMember(memberId);
                         }
                         partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
-                        if(log.isInfoEnabled()){
+                        if (log.isInfoEnabled()) {
                             log.info(String.format("Member stat context has 
been added: [member] %s", memberId));
                         }
                     }
 
                 }
                 networkPartitionContext.addPartitionContext(partitionContext);
-                if(log.isInfoEnabled()){
+                if (log.isInfoEnabled()) {
                     log.info(String.format("Partition context has been added: 
[partition] %s",
-                            partitionContext.getPartitionId()));
+                                           partitionContext.getPartitionId()));
                 }
             }
 
             clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-            if(log.isInfoEnabled()){
+            if (log.isInfoEnabled()) {
                 log.info(String.format("Network partition context has been 
added: [network partition] %s",
-                            networkPartitionContext.getId()));
+                                       networkPartitionContext.getId()));
             }
         }
-        
-        
+
+
         // find lb reference type
         java.util.Properties props = cluster.getProperties();
-        
-        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
             String value = props.getProperty(Constants.LOAD_BALANCER_REF);
             clusterMonitor.setLbReferenceType(value);
-            if(log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: "+value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
             }
         }
-        
+
         // set hasPrimary property
         // hasPrimary is true if there are primary members available in that 
cluster
         
clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
 
-        log.info("VMServiceClusterMonitor created: 
"+clusterMonitor.toString());
+        log.info("VMServiceClusterMonitor created: " + 
clusterMonitor.toString());
         return clusterMonitor;
     }
-    
+
     private static Properties convertMemberPropsToMemberContextProps(
-                       java.util.Properties properties) {
-       Properties props = new Properties();
-       for (Map.Entry<Object, Object> e : properties.entrySet()        ) {
-                       Property prop = new Property();
-                       prop.setName((String)e.getKey());
-                       prop.setValue((String)e.getValue());
-                       props.addProperties(prop);
-               }       
-               return props;
-       }
-
-
-       private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster 
cluster) throws PolicyValidationException, PartitionValidationException {
+            java.util.Properties properties) {
+        Properties props = new Properties();
+        for (Map.Entry<Object, Object> e : properties.entrySet()) {
+            Property prop = new Property();
+            prop.setName((String) e.getKey());
+            prop.setValue((String) e.getValue());
+            props.addProperties(prop);
+        }
+        return props;
+    }
+
+
+    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
         // FIXME fix the following code to correctly update
         // AutoscalerContext context = AutoscalerContext.getInstance();
         if (null == cluster) {
@@ -222,11 +226,11 @@ public class ClusterMonitorFactory {
         }
 
         AutoscalePolicy policy =
-                                 PolicyManager.getInstance()
-                                              
.getAutoscalePolicy(autoscalePolicyName);
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
         DeploymentPolicy deploymentPolicy =
-                                            PolicyManager.getInstance()
-                                                         
.getDeploymentPolicy(deploymentPolicyName);
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
 
         if (deploymentPolicy == null) {
             String msg = "Deployment Policy is null. Policy name: " + 
deploymentPolicyName;
@@ -236,21 +240,21 @@ public class ClusterMonitorFactory {
 
         String clusterId = cluster.getClusterId();
         VMLbClusterMonitor clusterMonitor =
-                                        new VMLbClusterMonitor(clusterId,
-                                                           
cluster.getServiceName(),
-                                                           deploymentPolicy, 
policy);
+                new VMLbClusterMonitor(clusterId,
+                                       cluster.getServiceName(),
+                                       deploymentPolicy, policy);
         clusterMonitor.setStatus(ClusterStatus.Created);
         // partition group = network partition context
         for (PartitionGroup partitionGroup : 
deploymentPolicy.getPartitionGroups()) {
 
             NetworkPartitionLbHolder networkPartitionLbHolder =
-                                                              
PartitionManager.getInstance()
-                                                                              
.getNetworkPartitionLbHolder(partitionGroup.getId());
+                    PartitionManager.getInstance()
+                            
.getNetworkPartitionLbHolder(partitionGroup.getId());
 //                                                              
PartitionManager.getInstance()
 //                                                                             
 .getNetworkPartitionLbHolder(partitionGroup.getId());
             // FIXME pick a random partition
             Partition partition =
-                                  partitionGroup.getPartitions()[new 
Random().nextInt(partitionGroup.getPartitions().length)];
+                    partitionGroup.getPartitions()[new 
Random().nextInt(partitionGroup.getPartitions().length)];
             PartitionContext partitionContext = new 
PartitionContext(partition);
             partitionContext.setServiceName(cluster.getServiceName());
             partitionContext.setProperties(cluster.getProperties());
@@ -258,7 +262,8 @@ public class ClusterMonitorFactory {
             partitionContext.setMinimumMemberCount(1);//Here it hard codes the 
minimum value as one for LB cartridge partitions
 
             NetworkPartitionContext networkPartitionContext = new 
NetworkPartitionContext(partitionGroup.getId(),
-                    partitionGroup.getPartitionAlgo(), 
partitionGroup.getPartitions()) ;
+                                                                               
           partitionGroup.getPartitionAlgo(),
+                                                                               
           partitionGroup.getPartitions());
             for (Member member : cluster.getMembers()) {
                 String memberId = member.getMemberId();
                 if 
(member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId()))
 {
@@ -280,23 +285,23 @@ public class ClusterMonitorFactory {
                     }
 
                     partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
-                    if(log.isInfoEnabled()){
+                    if (log.isInfoEnabled()) {
                         log.info(String.format("Member stat context has been 
added: [member] %s", memberId));
                     }
                 }
 
             }
             networkPartitionContext.addPartitionContext(partitionContext);
-            
+
             // populate lb cluster id in network partition context.
             java.util.Properties props = cluster.getProperties();
 
             // get service type of load balanced cluster
             String loadBalancedServiceType = 
props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-            
-            if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+            if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
                 String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-                
+
                 if 
(value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER))
 {
                     networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
 
@@ -317,13 +322,17 @@ public class ClusterMonitorFactory {
             clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
         }
 
-        log.info("VMLbClusterMonitor created: "+clusterMonitor.toString());
+        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
         return clusterMonitor;
     }
-       
-    private static DockerServiceClusterMonitor 
getDockerServiceClusterMonitor(Cluster cluster) {
 
-       if (null == cluster) {
+    /**
+     * @param cluster - the cluster which needs to be monitored
+     * @return - the cluster monitor
+     */
+    private static KubernetesServiceClusterMonitor 
getDockerServiceClusterMonitor(Cluster cluster) {
+
+        if (null == cluster) {
             return null;
         }
 
@@ -335,42 +344,43 @@ public class ClusterMonitorFactory {
         AutoscalePolicy policy = 
PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
         java.util.Properties props = cluster.getProperties();
         String kubernetesHostClusterID = 
props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-               KubernetesClusterContext kubernetesClusterCtxt = new 
KubernetesClusterContext(kubernetesHostClusterID, 
-                               cluster.getClusterId());
-
-        DockerServiceClusterMonitor dockerClusterMonitor = new 
DockerServiceClusterMonitor(
-                       kubernetesClusterCtxt, 
-                       cluster.getClusterId(), 
-                       cluster.getServiceName(), 
-                       policy);
-                                        
+        KubernetesClusterContext kubernetesClusterCtxt = new 
KubernetesClusterContext(kubernetesHostClusterID,
+                                                                               
       cluster.getClusterId());
+
+        KubernetesServiceClusterMonitor dockerClusterMonitor = new 
KubernetesServiceClusterMonitor(
+                kubernetesClusterCtxt,
+                cluster.getClusterId(),
+                cluster.getServiceName(),
+                policy);
+
         dockerClusterMonitor.setStatus(ClusterStatus.Created);
-        
-               for (Member member : cluster.getMembers()) {
-                       String memberId = member.getMemberId();
-                       String clusterId = member.getClusterId();
-                       MemberContext memberContext = new MemberContext();
-                       memberContext.setMemberId(memberId);
-                       memberContext.setClusterId(clusterId);
-
-                       if (MemberStatus.Activated.equals(member.getStatus())) {
-                               
dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
-                       } else if 
(MemberStatus.Created.equals(member.getStatus())
-                                       || 
MemberStatus.Starting.equals(member.getStatus())) {
-                               
dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-                       }
-               }
+
+        //populate the members after restarting        
+        for (Member member : cluster.getMembers()) {
+            String memberId = member.getMemberId();
+            String clusterId = member.getClusterId();
+            MemberContext memberContext = new MemberContext();
+            memberContext.setMemberId(memberId);
+            memberContext.setClusterId(clusterId);
+
+            if (MemberStatus.Activated.equals(member.getStatus())) {
+                
dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+            } else if (MemberStatus.Created.equals(member.getStatus())
+                       || MemberStatus.Starting.equals(member.getStatus())) {
+                
dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+            }
+        }
 
         // find lb reference type
-        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
             String value = props.getProperty(Constants.LOAD_BALANCER_REF);
             dockerClusterMonitor.setLbReferenceType(value);
-            if(log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: "+value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
             }
         }
-        
-        log.info("KubernetesServiceClusterMonitor created: "+ 
dockerClusterMonitor.toString());
+
+        log.info("KubernetesServiceClusterMonitor created: " + 
dockerClusterMonitor.toString());
         return dockerClusterMonitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
deleted file mode 100644
index 2621690..0000000
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
-
-/*
- * Every container cluster monitor should extend this class
- */
-public abstract class ContainerClusterMonitor extends AbstractClusterMonitor {
-
-       private KubernetesClusterContext kubernetesClusterCtxt;
-       protected AutoscalePolicy autoscalePolicy;
-       
-       protected ContainerClusterMonitor(String clusterId, String serviceId, 
ClusterType clusterType, 
-                       KubernetesClusterContext kubernetesClusterContext,
-                       AutoscalerRuleEvaluator autoscalerRuleEvaluator, 
AutoscalePolicy autoscalePolicy){
-               
-               super(clusterId, serviceId, clusterType, 
autoscalerRuleEvaluator);
-               this.kubernetesClusterCtxt = kubernetesClusterContext;
-               this.autoscalePolicy = autoscalePolicy;
-       }
-    
-       public KubernetesClusterContext getKubernetesClusterCtxt() {
-               return kubernetesClusterCtxt;
-       }
-
-       public void setKubernetesClusterCtxt(
-                       KubernetesClusterContext kubernetesClusterCtxt) {
-               this.kubernetesClusterCtxt = kubernetesClusterCtxt;
-       }
-       
-       public AutoscalePolicy getAutoscalePolicy() {
-               return autoscalePolicy;
-       }
-
-       public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
-               this.autoscalePolicy = autoscalePolicy;
-       }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
deleted file mode 100644
index 850a295..0000000
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Properties;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.common.enums.ClusterType;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{
-       
-       private static final Log log = 
LogFactory.getLog(DockerServiceClusterMonitor.class);
-
-       private String lbReferenceType;
-    private int numberOfReplicasInServiceCluster = 0;
-       int retryInterval = 60000;
-       
-    public DockerServiceClusterMonitor(KubernetesClusterContext 
kubernetesClusterCtxt, 
-               String serviceClusterID, String serviceId, AutoscalePolicy 
autoscalePolicy) {
-       super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, 
kubernetesClusterCtxt,
-                       new AutoscalerRuleEvaluator(), autoscalePolicy);
-        readConfigurations();
-    }
-
-       @Override
-       public void run() {
-               try {
-                       // TODO make this configurable,
-                       // this is the delay the min check of normal cluster 
monitor to wait
-                       // until LB monitor is added
-                       Thread.sleep(60000);
-               } catch (InterruptedException ignore) {
-               }
-
-               while (!isDestroyed()) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("KubernetesServiceClusterMonitor is 
running.. " + this.toString());
-                       }
-                       try {
-                               if 
(!ClusterStatus.In_Maintenance.equals(getStatus())) {
-                                       monitor();
-                               } else {
-                                       if (log.isDebugEnabled()) {
-                                               
log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
-                                                               + 
ClusterStatus.In_Maintenance + " mode......");
-                                       }
-                               }
-                       } catch (Exception e) {
-                               log.error("KubernetesServiceClusterMonitor : 
Monitor failed." + this.toString(),
-                                               e);
-                       }
-                       try {
-                               Thread.sleep(getMonitorInterval());
-                       } catch (InterruptedException ignore) {
-                       }
-               }
-       }
-       
-       @Override
-       protected void monitor() {
-               
-           // is container created successfully?
-               boolean success = false;
-               String kubernetesClusterId = 
getKubernetesClusterCtxt().getKubernetesClusterID();
-               
-               try {
-                       TopologyManager.acquireReadLock();
-                       Properties props = 
TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties();
-                       int minReplicas = 
Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
-                       
-                       int nonTerminatedMembers = 
getKubernetesClusterCtxt().getActiveMembers().size() + 
getKubernetesClusterCtxt().getPendingMembers().size();
-
-                       if (nonTerminatedMembers == 0) {
-                               
-                               while (!success) {
-                                       try {
-
-                                               MemberContext memberContext = 
CloudControllerClient.getInstance().createContainer(kubernetesClusterId, 
getClusterId());
-                                               if(null != memberContext) {
-                                                       
getKubernetesClusterCtxt().addPendingMember(memberContext);
-                                                       success = true;
-                                                       
numberOfReplicasInServiceCluster = minReplicas;
-                                                       
if(log.isDebugEnabled()){
-                                                               
log.debug(String.format("Pending member added, [member] %s [kub cluster] %s", 
-                                                                               
memberContext.getMemberId(), 
getKubernetesClusterCtxt().getKubernetesClusterID()));
-                                                       }
-                                               } else {
-                                                       if 
(log.isDebugEnabled()) {
-                                                               
log.debug("Returned member context is null, did not add to pending members");
-                                                       }
-                                               }
-                                       } catch (Throwable e) {
-                                               if (log.isDebugEnabled()) {
-                                                       String message = 
"Cannot create a container, will retry in "+(retryInterval/1000)+"s";
-                                                       log.debug(message, e);
-                                               }
-                                       }
-                                       
-                       try {
-                           Thread.sleep(retryInterval);
-                       } catch (InterruptedException e1) {
-                       }
-                               }
-                       }
-               } finally {
-                       TopologyManager.releaseReadLock();
-               }
-       }
-       
-       @Override
-       public void destroy() {
-        getMinCheckKnowledgeSession().dispose();
-        getScaleCheckKnowledgeSession().dispose();
-        setDestroyed(true);
-        if(log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor Drools session has been 
disposed. "+this.toString());
-        }              
-       }
-       
-    @Override
-    protected void readConfigurations () {
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        int monitorInterval = 
conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
-        setMonitorInterval(monitorInterval);
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor task interval: " + 
getMonitorInterval());
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "KubernetesServiceClusterMonitor "
-                       + "[ kubernetesHostClusterId=" + 
getKubernetesClusterCtxt().getKubernetesClusterID()
-                       + ", clusterId=" + getClusterId() 
-                       + ", serviceId=" + getServiceId() + "]";
-    }
-    
-       public String getLbReferenceType() {
-               return lbReferenceType;
-       }
-
-       public void setLbReferenceType(String lbReferenceType) {
-               this.lbReferenceType = lbReferenceType;
-       }
-}
\ No newline at end of file

Reply via email to