http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index b8dcd73..6525eba 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -19,34 +19,54 @@
 
 package org.apache.stratos.autoscaler.message.receiver.topology;
 
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.*;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
 import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
 import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.PartitionValidationException;
 import org.apache.stratos.autoscaler.exception.PolicyValidationException;
 import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
-import org.apache.stratos.autoscaler.monitor.ClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.KubernetesClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.LbClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
 import org.apache.stratos.autoscaler.partition.PartitionManager;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import 
org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.ClusterMaintenanceModeEventListener;
+import 
org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberReadyToShutdownEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.drools.runtime.StatefulKnowledgeSession;
 import org.drools.runtime.rule.FactHandle;
 
-import java.util.List;
-
 /**
  * Autoscaler topology receiver.
  */
@@ -116,42 +136,60 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 try {
                     MemberReadyToShutdownEvent memberReadyToShutdownEvent = 
(MemberReadyToShutdownEvent)event;
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    AbstractMonitor monitor;
+                    AbstractClusterMonitor monitor;
                     String clusterId = 
memberReadyToShutdownEvent.getClusterId();
                     String memberId = memberReadyToShutdownEvent.getMemberId();
 
-                    if(asCtx.monitorExist(clusterId)){
-                        monitor = asCtx.getMonitor(clusterId);
-                    }else if(asCtx.lbMonitorExist(clusterId)){
-                        monitor = asCtx.getLBMonitor(clusterId);
-                    }else{
+                    if(asCtx.clusterMonitorExist(clusterId)) {
+                        monitor = asCtx.getClusterMonitor(clusterId);
+                    } else {
                         if(log.isDebugEnabled()){
                             log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                         }
                         return;
                     }
-
-                    NetworkPartitionContext nwPartitionCtxt;
-                    nwPartitionCtxt = 
monitor.getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
-
-                    // start a new member in the same Partition
-                    String partitionId = 
monitor.getPartitionOfMember(memberId);
-                    PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
-                    // terminate the shutdown ready member
-                    CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
-                    ccClient.terminate(memberId);
-
-                    // remove from active member list
-                    partitionCtxt.removeActiveMemberById(memberId);
-
-                    if (log.isInfoEnabled()) {
-                        log.info(String.format("Member is terminated and 
removed from the active members list: [member] %s [partition] %s [cluster] %s ",
-                                               memberId, partitionId, 
clusterId));
+                    
+                    TopologyManager.acquireReadLock();
+                    
+                    if(monitor.getClusterType() == 
ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
+                       
+                        NetworkPartitionContext nwPartitionCtxt;
+                        nwPartitionCtxt = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
+
+                        // start a new member in the same Partition
+                        String partitionId = ((VMClusterMonitor) 
monitor).getPartitionOfMember(memberId);
+                        PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+
+                        // terminate the shutdown ready member
+                        CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
+                        ccClient.terminate(memberId);
+
+                        // remove from active member list
+                        partitionCtxt.removeActiveMemberById(memberId);
+                        
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member is terminated and 
removed from the active members list: [member] %s [partition] %s [cluster] %s ",
+                                                   memberId, partitionId, 
clusterId));
+                        }
+                    } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       //terminate the shutdown ready container
+                       
CloudControllerClient.getInstance().terminateContainer(memberId);
+                       //remove from active member list
+                       
kubernetesClusterContext.removeActiveMemberById(memberId);
+                       
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member is terminated and 
removed from the active members list: [member] %s [kub cluster] %s [cluster] %s 
",
+                                                   memberId, 
kubernetesClusterContext.getKubernetesClusterID(), clusterId));
+                        }
                     }
+
                 } catch (TerminationException e) {
                     log.error(e);
+                } finally {
+                    TopologyManager.releaseReadLock();
                 }
             }
 
@@ -185,12 +223,8 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                     TopologyManager.acquireReadLock();
                     Service service = 
TopologyManager.getTopology().getService(e.getServiceName());
                     Cluster cluster = service.getCluster(e.getClusterId());
-                    
if(AutoscalerContext.getInstance().kubernetesClusterMonitorExist(cluster.getClusterId()))
 {
-                       
AutoscalerContext.getInstance().getKubernetesClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
-                    } else 
if(AutoscalerContext.getInstance().monitorExist((cluster.getClusterId()))) {
-                        
AutoscalerContext.getInstance().getMonitor(e.getClusterId()).setStatus(e.getStatus());
-                    } else if 
(AutoscalerContext.getInstance().lbMonitorExist((cluster.getClusterId()))) {
-                        
AutoscalerContext.getInstance().getLBMonitor(e.getClusterId()).setStatus(e.getStatus());
+                    
if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) 
{
+                       
AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
                     } else {
                         log.error("cluster monitor not exists for the cluster: 
" + cluster.toString());
                     }
@@ -213,8 +247,7 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                     String clusterId = e.getClusterId();
                     String deploymentPolicy = e.getDeploymentPolicy();
 
-                    AbstractMonitor monitor = null;
-                    KubernetesClusterMonitor kubernetesClusterMonitor = null;
+                    AbstractClusterMonitor monitor = null;
 
                     if (e.isLbCluster()) {
                         DeploymentPolicy depPolicy = 
PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
@@ -239,13 +272,9 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 
                             }
                         }
-                        monitor = AutoscalerContext.getInstance()
-                                .removeLbMonitor(clusterId);
-
-                    } else {
-                        monitor = AutoscalerContext.getInstance()
-                                .removeMonitor(clusterId);
                     }
+                    
+                    monitor = 
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);                
               
 
                     // runTerminateAllRule(monitor);
                     if (monitor != null) {
@@ -280,43 +309,73 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                     String networkPartitionId = e.getNetworkPartitionId();
                     String clusterId = e.getClusterId();
                     String partitionId = e.getPartitionId();
-                    AbstractMonitor monitor;
+                    String memberId = e.getMemberId();
+                    AbstractClusterMonitor monitor;
+                    
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
 
-                    if 
(AutoscalerContext.getInstance().monitorExist(clusterId)) {
-                        monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
+                    if(asCtx.clusterMonitorExist(clusterId)) {
+                        monitor = asCtx.getClusterMonitor(clusterId);
                     } else {
-                        //This is LB member
-                        monitor = 
AutoscalerContext.getInstance().getLBMonitor(clusterId);
+                        if(log.isDebugEnabled()){
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
+                        }
+                        return;
                     }
+                    
+                    if(monitor.getClusterType() == 
ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
+                       
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+
+                        PartitionContext partitionContext = 
networkPartitionContext.getPartitionCtxt(partitionId);
+                        partitionContext.removeMemberStatsContext(memberId);
+
+                        if 
(partitionContext.removeTerminationPendingMember(memberId)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Member is removed 
from termination pending members list: [member] %s", memberId));
+                            }
+                        } else if 
(partitionContext.removePendingMember(memberId)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Member is removed 
from pending members list: [member] %s", memberId));
+                            }
+                        } else if 
(partitionContext.removeActiveMemberById(memberId)) {
+                            log.warn(String.format("Member is in the wrong 
list and it is removed from active members list", memberId));
+                        } else if 
(partitionContext.removeObsoleteMember(memberId)){
+                               log.warn(String.format("Member's obsolated 
timeout has been expired and it is removed from obsolated members list", 
memberId));
+                        } else {
+                            log.warn(String.format("Member is not available in 
any of the list active, pending and termination pending", memberId));
+                        }
 
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-
-                    PartitionContext partitionContext = 
networkPartitionContext.getPartitionCtxt(partitionId);
-                    String memberId = e.getMemberId();
-                    partitionContext.removeMemberStatsContext(memberId);
-
-                    if 
(partitionContext.removeTerminationPendingMember(memberId)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Member is removed from 
termination pending members list: [member] %s", memberId));
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has 
been removed successfully: [member] %s", memberId));
                         }
-                    } else if (partitionContext.removePendingMember(memberId)) 
{
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Member is removed from 
pending members list: [member] %s", memberId));
+                    } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       
+                       KubernetesClusterContext kubernetesClusterContext = 
((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       
kubernetesClusterContext.removeMemberStatsContext(memberId);
+                       
+                        if 
(kubernetesClusterContext.removeTerminationPendingMember(memberId)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Member is removed 
from termination pending members list: [member] %s", memberId));
+                            }
+                        } else if 
(kubernetesClusterContext.removePendingMember(memberId)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Member is removed 
from pending members list: [member] %s", memberId));
+                            }
+                        } else if 
(kubernetesClusterContext.removeActiveMemberById(memberId)) {
+                            log.warn(String.format("Member is in the wrong 
list and it is removed from active members list", memberId));
+                        } else if 
(kubernetesClusterContext.removeObsoleteMember(memberId)){
+                               log.warn(String.format("Member's obsolated 
timeout has been expired and it is removed from obsolated members list", 
memberId));
+                        } else {
+                            log.warn(String.format("Member is not available in 
any of the list active, pending and termination pending", memberId));
                         }
-                    } else if 
(partitionContext.removeActiveMemberById(memberId)) {
-                        log.warn(String.format("Member is in the wrong list 
and it is removed from active members list", memberId));
-                    } else if 
(partitionContext.removeObsoleteMember(memberId)){
-                       log.warn(String.format("Member's obsolated timeout has 
been expired and it is removed from obsolated members list", memberId));
-                    } else {
-                        log.warn(String.format("Member is not available in any 
of the list active, pending and termination pending", memberId));
-                    }
 
-                    if (log.isInfoEnabled()) {
-                        log.info(String.format("Member stat context has been 
removed successfully: [member] %s", memberId));
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has 
been removed successfully: [member] %s", memberId));
+                        }
                     }
-//                partitionContext.decrementCurrentActiveMemberCount(1);
-
-
+                    
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
@@ -338,24 +397,37 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                     String partitionId = e.getPartitionId();
                     String networkPartitionId = e.getNetworkPartitionId();
 
-                    PartitionContext partitionContext;
                     String clusterId = e.getClusterId();
-                    AbstractMonitor monitor;
-
-                    if 
(AutoscalerContext.getInstance().monitorExist(clusterId)) {
-                        monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                        partitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                    AbstractClusterMonitor monitor;
+                    
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    if(asCtx.clusterMonitorExist(clusterId)) {
+                        monitor = asCtx.getClusterMonitor(clusterId);
                     } else {
-                        monitor = 
AutoscalerContext.getInstance().getLBMonitor(clusterId);
-                        partitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                    }
-                    partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
-                    if (log.isInfoEnabled()) {
-                        log.info(String.format("Member stat context has been 
added successfully: [member] %s", memberId));
+                        if(log.isDebugEnabled()){
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
+                        }
+                        return;
                     }
-//                partitionContext.incrementCurrentActiveMemberCount(1);
-                    
partitionContext.movePendingMemberToActiveMembers(memberId);
-
+                    
+                    if (monitor.getClusterType() == 
ClusterType.VMServiceCluster) {    
+                       PartitionContext partitionContext;
+                        partitionContext = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                        partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has 
been added successfully: [member] %s", memberId));
+                        }
+                        
partitionContext.movePendingMemberToActiveMembers(memberId);
+                                       } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                                               KubernetesClusterContext 
kubernetesClusterContext;
+                                               kubernetesClusterContext = 
((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
+                                               
kubernetesClusterContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has 
been added successfully: [member] %s", memberId));
+                        }
+                                               
kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
+                                       }
+                    
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
@@ -368,42 +440,59 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
            @Override
            protected void onEvent(Event event) {
                try {
+                  TopologyManager.acquireReadLock();
+                  
                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = 
(MemberReadyToShutdownEvent)event;
                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                   AbstractMonitor monitor;
+                   AbstractClusterMonitor monitor;
                    String clusterId = 
memberReadyToShutdownEvent.getClusterId();
                    String memberId = memberReadyToShutdownEvent.getMemberId();
 
-                   if(asCtx.monitorExist(clusterId)){
-                       monitor = asCtx.getMonitor(clusterId);
-                   }else if(asCtx.lbMonitorExist(clusterId)){
-                       monitor = asCtx.getLBMonitor(clusterId);
-                   }else{
+                   if(asCtx.clusterMonitorExist(clusterId)) {
+                       monitor = asCtx.getClusterMonitor(clusterId);
+                   } else {
                        if(log.isDebugEnabled()){
                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                        }
                        return;
                    }
 
-                   NetworkPartitionContext nwPartitionCtxt;
-                   nwPartitionCtxt = 
monitor.getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
+                   if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                                  || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
+                          
+                       NetworkPartitionContext nwPartitionCtxt;
+                       nwPartitionCtxt = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
 
-                   // start a new member in the same Partition
-                   String partitionId = monitor.getPartitionOfMember(memberId);
-                   PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
+                       // start a new member in the same Partition
+                       String partitionId = ((VMClusterMonitor) 
monitor).getPartitionOfMember(memberId);
+                       PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
 
 
-                   // terminate the shutdown ready member
-                   CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
-                   ccClient.terminate(memberId);
+                       // terminate the shutdown ready member
+                       CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
+                       ccClient.terminate(memberId);
 
-                   // remove from active member list
-                   partitionCtxt.removeActiveMemberById(memberId);
+                       // remove from active member list
+                       partitionCtxt.removeActiveMemberById(memberId);
 
-                   if (log.isInfoEnabled()) {
-                       log.info(String.format("Member is terminated and 
removed from the active members list: [member] %s [partition] %s [cluster] %s ",
-                                              memberId, partitionId, 
clusterId));
+                       if (log.isInfoEnabled()) {
+                           log.info(String.format("Member is terminated and 
removed from the active members list: [member] %s [partition] %s [cluster] %s ",
+                                                  memberId, partitionId, 
clusterId));
+                       }
+                   } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                          KubernetesClusterContext kubernetesClusterContext;
+                          kubernetesClusterContext = 
((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
+                          // terminate the shutdown ready member
+                          
CloudControllerClient.getInstance().terminateContainer(memberId);
+                          // remove from active member list
+                          
kubernetesClusterContext.removeActiveMemberById(memberId);
+                          
+                       if (log.isInfoEnabled()) {
+                           log.info(String.format("Member is terminated and 
removed from the active members list: [member] %s [kub cluster] %s [cluster] %s 
",
+                                                  memberId, 
kubernetesClusterContext.getKubernetesClusterID(), clusterId));
+                       }
                    }
+
                } catch (TerminationException e) {
                    log.error(e);
                }
@@ -424,22 +513,38 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                     String partitionId = e.getPartitionId();
                     String networkPartitionId = e.getNetworkPartitionId();
 
-                    PartitionContext partitionContext;
                     String clusterId = e.getClusterId();
-                    AbstractMonitor monitor;
-
-                    if 
(AutoscalerContext.getInstance().monitorExist(clusterId)) {
-                        monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                        partitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                    AbstractClusterMonitor monitor;
+                    
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    if (asCtx.clusterMonitorExist(clusterId)) {
+                        monitor = 
AutoscalerContext.getInstance().getClusterMonitor(clusterId);
                     } else {
-                        monitor = 
AutoscalerContext.getInstance().getLBMonitor(clusterId);
-                        partitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                        if(log.isDebugEnabled()){
+                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
+                        }
+                        return;
                     }
-                    partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member has been moved as 
pending termination: [member] %s", memberId));
+                    
+                    if(monitor.getClusterType() == 
ClusterType.VMServiceCluster 
+                                  || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
+                       
+                       PartitionContext partitionContext;
+                       partitionContext = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                        partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Member has been moved as 
pending termination: [member] %s", memberId));
+                        }
+                        
partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+                    } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext;
+                       kubernetesClusterContext = ((ContainerClusterMonitor) 
monitor).getKubernetesClusterCtxt();
+                       kubernetesClusterContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Member has been moved as 
pending termination: [member] %s", memberId));
+                        }
+                       
kubernetesClusterContext.moveActiveMemberToTerminationPendingMembers(memberId);
                     }
-                    
partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
 
                 } catch (Exception e) {
                     log.error("Error processing event", e);
@@ -471,64 +576,15 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         });
     }
 
-    private class LBClusterMonitorAdder implements Runnable {
-        private Cluster cluster;
-
-        public LBClusterMonitorAdder(Cluster cluster) {
-            this.cluster = cluster;
-        }
-
-        public void run() {
-            LbClusterMonitor monitor = null;
-            int retries = 5;
-            boolean success = false;
-            do {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e1) {
-                }
-                try {
-                    monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
-                    success = true;
-
-                } catch (PolicyValidationException e) {
-                    String msg = "LB Cluster monitor creation failed for 
cluster: " + cluster.getClusterId();
-                    log.debug(msg, e);
-                    retries--;
-
-                } catch (PartitionValidationException e) {
-                    String msg = "LB Cluster monitor creation failed for 
cluster: " + cluster.getClusterId();
-                    log.debug(msg, e);
-                    retries--;
-                }
-            } while (!success && retries <= 0);
-
-            if (monitor == null) {
-                String msg = "LB Cluster monitor creation failed, even after 
retrying for 5 times, "
-                        + "for cluster: " + cluster.getClusterId();
-                log.error(msg);
-                throw new RuntimeException(msg);
-            }
-
-            Thread th = new Thread(monitor);
-            th.start();
-            AutoscalerContext.getInstance().addLbMonitor(monitor);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("LB Cluster monitor has been added 
successfully: [cluster] %s",
-                        cluster.getClusterId()));
-            }
-        }
-    }
-
     private class ClusterMonitorAdder implements Runnable {
         private Cluster cluster;
-
+        private String clusterMonitorType;
         public ClusterMonitorAdder(Cluster cluster) {
             this.cluster = cluster;
         }
 
         public void run() {
-            ClusterMonitor monitor = null;
+            AbstractClusterMonitor monitor = null;
             int retries = 5;
             boolean success = false;
             do {
@@ -538,68 +594,23 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 }
 
                 try {
-                    monitor = AutoscalerUtil.getClusterMonitor(cluster);
+                    monitor = ClusterMonitorFactory.getMonitor(cluster);
                     success = true;
-
+                    clusterMonitorType = monitor.getClusterType().name();
                 } catch (PolicyValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: 
" + cluster.getClusterId();
+                    String msg = clusterMonitorType +" monitor creation failed 
for cluster: " + cluster.getClusterId();
                     log.debug(msg, e);
                     retries--;
 
                 } catch (PartitionValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: 
" + cluster.getClusterId();
-                    log.debug(msg, e);
-                    retries--;
-                }
-            } while (!success && retries != 0);
-
-            if (monitor == null) {
-                String msg = "Cluster monitor creation failed, even after 
retrying for 5 times, "
-                        + "for cluster: " + cluster.getClusterId();
-                log.error(msg);
-                throw new RuntimeException(msg);
-            }
-
-            Thread th = new Thread(monitor);
-            th.start();
-            AutoscalerContext.getInstance().addMonitor(monitor);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Cluster monitor has been added 
successfully: [cluster] %s",
-                        cluster.getClusterId()));
-            }
-        }
-    }
-
-    private class KubernetesClusterMonitorAdder implements Runnable {
-        private Cluster cluster;
-
-        public KubernetesClusterMonitorAdder(Cluster cluster) {
-            this.cluster = cluster;
-        }
-
-        public void run() {
-            KubernetesClusterMonitor monitor = null;
-            int retries = 5;
-            boolean success = false;
-            do {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e1) {
-                }
-
-                try {
-                    monitor = 
AutoscalerUtil.getKubernetesClusterMonitor(cluster);
-                    success = true;
-
-                } catch (Exception e) {
-                    String msg = "Kubernetes cluster monitor creation failed 
for cluster: " + cluster.getClusterId();
+                    String msg = clusterMonitorType +" monitor creation failed 
for cluster: " + cluster.getClusterId();
                     log.debug(msg, e);
                     retries--;
                 }
             } while (!success && retries != 0);
 
             if (monitor == null) {
-                String msg = "Kubernetes cluster monitor creation failed, even 
after retrying for 5 times, "
+                String msg = clusterMonitorType +" monitor creation failed, 
even after retrying for 5 times, "
                         + "for cluster: " + cluster.getClusterId();
                 log.error(msg);
                 throw new RuntimeException(msg);
@@ -607,16 +618,16 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 
             Thread th = new Thread(monitor);
             th.start();
-            
AutoscalerContext.getInstance().addKubernetesClusterMonitor(monitor);
+            AutoscalerContext.getInstance().addClusterMonitor(monitor);
             if (log.isInfoEnabled()) {
-                log.info(String.format("Kubernetes cluster monitor has been 
added successfully: [cluster] %s",
-                        cluster.getClusterId()));
+                log.info(String.format("%s monitor has been added 
successfully: [cluster] %s",
+                        clusterMonitorType, cluster.getClusterId()));
             }
         }
     }
-    
+ 
     @SuppressWarnings("unused")
-       private void runTerminateAllRule(AbstractMonitor monitor) {
+       private void runTerminateAllRule(VMClusterMonitor monitor) {
 
         FactHandle terminateAllFactHandle = null;
 
@@ -639,16 +650,9 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 
     protected synchronized void startClusterMonitor(Cluster cluster) {
         Thread th = null;
-        if (cluster.isKubernetesCluster() 
-                       && 
!AutoscalerContext.getInstance().kubernetesClusterMonitorExist(cluster.getClusterId()))
 {
-               th = new Thread(new KubernetesClusterMonitorAdder(cluster));
-        } else if (cluster.isLbCluster() 
-                       && 
!AutoscalerContext.getInstance().lbMonitorExist(cluster.getClusterId())) {
-            th = new Thread(new LBClusterMonitorAdder(cluster));
-        } else if (!cluster.isLbCluster() && !cluster.isKubernetesCluster()
-                       && 
!AutoscalerContext.getInstance().monitorExist(cluster.getClusterId())) {
-            th = new Thread(new ClusterMonitorAdder(cluster));
-        }
+        if 
(!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
+               th = new Thread(new ClusterMonitorAdder(cluster));
+        } 
         if (th != null) {
             th.start();
             try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
new file mode 100644
index 0000000..00796f1
--- /dev/null
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -0,0 +1,127 @@
+package org.apache.stratos.autoscaler.monitor;
+
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.common.enums.ClusterType;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
+public abstract class AbstractClusterMonitor implements Runnable{
+       
+    private String clusterId;
+    private String serviceId;
+    private ClusterType clusterType;
+       private ClusterStatus status;
+       private int monitorInterval;
+       
+       protected FactHandle minCheckFactHandle;
+       protected FactHandle scaleCheckFactHandle;
+       private StatefulKnowledgeSession minCheckKnowledgeSession;
+       private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+       private boolean isDestroyed;
+       
+       private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+       
+       protected AbstractClusterMonitor(String clusterId, String serviceId, 
ClusterType clusterType, 
+                       AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+               
+               super();
+               this.clusterId = clusterId;
+               this.serviceId = serviceId;
+               this.clusterType = clusterType;
+               this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+        this.scaleCheckKnowledgeSession = 
autoscalerRuleEvaluator.getScaleCheckStatefulSession();
+        this.minCheckKnowledgeSession = 
autoscalerRuleEvaluator.getMinCheckStatefulSession();
+       }
+
+       protected abstract void readConfigurations();
+       protected abstract void monitor();
+    public abstract void destroy();
+    
+       public String getClusterId() {
+               return clusterId;
+       }
+       
+       public void setClusterId(String clusterId) {
+               this.clusterId = clusterId;
+       }
+       
+       public void setStatus(ClusterStatus status) {
+               this.status = status;
+       }
+
+       public ClusterType getClusterType() {
+               return clusterType;
+       }
+
+       public ClusterStatus getStatus() {
+               return status;
+       }
+       
+       public String getServiceId() {
+               return serviceId;
+       }
+       
+       public void setServiceId(String serviceId) {
+               this.serviceId = serviceId;
+       }
+       
+       public int getMonitorInterval() {
+               return monitorInterval;
+       }
+       
+       public void setMonitorInterval(int monitorInterval) {
+               this.monitorInterval = monitorInterval;
+       }
+
+       public FactHandle getMinCheckFactHandle() {
+               return minCheckFactHandle;
+       }
+       
+       public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+               this.minCheckFactHandle = minCheckFactHandle;
+       }
+       
+       public FactHandle getScaleCheckFactHandle() {
+               return scaleCheckFactHandle;
+       }
+       
+       public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+               this.scaleCheckFactHandle = scaleCheckFactHandle;
+       }
+       
+       public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+               return minCheckKnowledgeSession;
+       }
+       
+       public void setMinCheckKnowledgeSession(
+                       StatefulKnowledgeSession minCheckKnowledgeSession) {
+               this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+       }
+       
+       public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+               return scaleCheckKnowledgeSession;
+       }
+       
+       public void setScaleCheckKnowledgeSession(
+                       StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+               this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+       }
+       
+       public boolean isDestroyed() {
+               return isDestroyed;
+       }
+       
+       public void setDestroyed(boolean isDestroyed) {
+               this.isDestroyed = isDestroyed;
+       }
+
+       public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+               return autoscalerRuleEvaluator;
+       }
+
+       public void setAutoscalerRuleEvaluator(
+                       AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+               this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+       }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
deleted file mode 100644
index c1441bb..0000000
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Map;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.drools.runtime.StatefulKnowledgeSession;
-import org.drools.runtime.rule.FactHandle;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- *
- */
-   abstract public class AbstractMonitor implements Runnable{
-
-       private static final Log log = LogFactory.getLog(AbstractMonitor.class);
-       // Map<NetworkpartitionId, Network Partition Context>
-       protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
-       protected DeploymentPolicy deploymentPolicy;
-       protected AutoscalePolicy autoscalePolicy;
-       
-
-       protected FactHandle minCheckFactHandle;
-       protected FactHandle scaleCheckFactHandle;
-       
-       protected StatefulKnowledgeSession minCheckKnowledgeSession;
-       protected StatefulKnowledgeSession scaleCheckKnowledgeSession;
-       protected boolean isDestroyed;
-       
-       protected String clusterId;
-       protected String serviceId;
-       
-       protected AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-
-    // time intereval between two runs of the Monitor. Default is 90000ms.
-    protected int monitorInterval;
-
-    public AbstractMonitor() {
-        readConfigurations();
-    }
-
-    private void readConfigurations () {
-
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        monitorInterval = 
conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
-        if (log.isDebugEnabled()) {
-            log.debug("Cluster Monitor task interval: " + 
getMonitorInterval());
-        }
-    }
-
-       @Override
-       public void run() {
-               // TODO Auto-generated method stub
-               
-       }
-       
-           
-       public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
-               log.info("***** getNetworkPartitionCtxt " + 
member.getNetworkPartitionId());
-               String networkPartitionId = member.getNetworkPartitionId();
-       if(networkPartitionCtxts.containsKey(networkPartitionId)) {
-               log.info("returnnig network partition context " + 
networkPartitionCtxts.get(networkPartitionId));
-               return networkPartitionCtxts.get(networkPartitionId);
-       }
-       log.info("returning null getNetworkPartitionCtxt");
-           return null;
-       }
-       
-    public String getPartitionOfMember(String memberId){
-        for(Service service: TopologyManager.getTopology().getServices()){
-            for(Cluster cluster: service.getClusters()){
-                if(cluster.memberExists(memberId)){
-                    return cluster.getMember(memberId).getPartitionId();
-                }
-            }
-        }
-        return null;
-       }
-    
-    public void destroy() {
-        minCheckKnowledgeSession.dispose();
-        scaleCheckKnowledgeSession.dispose();
-        setDestroyed(true);
-        if(log.isDebugEnabled()) {
-            log.debug("Cluster Monitor Drools session has been disposed. 
"+this.toString());
-        }
-    }
-    
-    public boolean isDestroyed() {
-        return isDestroyed;
-    }
-
-    public void setDestroyed(boolean isDestroyed) {
-        this.isDestroyed = isDestroyed;
-    }
-
-    public String getServiceId() {
-        return serviceId;
-    }
-
-    public void setServiceId(String serviceId) {
-        this.serviceId = serviceId;
-    }
-
-    public DeploymentPolicy getDeploymentPolicy() {
-        return deploymentPolicy;
-    }
-
-    public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
-        this.deploymentPolicy = deploymentPolicy;
-    }
-
-    public AutoscalePolicy getAutoscalePolicy() {
-        return autoscalePolicy;
-    }
-
-    public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
-        this.autoscalePolicy = autoscalePolicy;
-    }    
-    
-    public String getClusterId() {
-        return clusterId;
-    }
-
-    public void setClusterId(String clusterId) {
-        this.clusterId = clusterId;
-    }
-
-    public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
-        return networkPartitionCtxts;
-    }
-
-    public NetworkPartitionContext getNetworkPartitionCtxt(String 
networkPartitionId) {
-        return networkPartitionCtxts.get(networkPartitionId);
-    }
-
-    public void setPartitionCtxt(Map<String, NetworkPartitionContext> 
partitionCtxt) {
-        this.networkPartitionCtxts = partitionCtxt;
-    }
-
-    public boolean partitionCtxtAvailable(String partitionId) {
-        return networkPartitionCtxts.containsKey(partitionId);
-    }
-
-    public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
-        this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
-    }
-    
-    public NetworkPartitionContext getPartitionCtxt(String id) {
-        return this.networkPartitionCtxts.get(id);
-    }
-
-    public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
-        return minCheckKnowledgeSession;
-    }
-
-    public void setMinCheckKnowledgeSession(StatefulKnowledgeSession 
minCheckKnowledgeSession) {
-        this.minCheckKnowledgeSession = minCheckKnowledgeSession;
-    }
-
-    public FactHandle getMinCheckFactHandle() {
-        return minCheckFactHandle;
-    }
-
-    public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
-        this.minCheckFactHandle = minCheckFactHandle;
-    }
-
-    public int getMonitorInterval() {
-        return monitorInterval;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
deleted file mode 100644
index 5bb478e..0000000
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- *
- */
-public class ClusterMonitor extends AbstractMonitor {
-
-    private static final Log log = LogFactory.getLog(ClusterMonitor.class);
-    private String lbReferenceType;
-    private boolean hasPrimary;
-    private ClusterStatus status;
-
-    public ClusterMonitor(String clusterId, String serviceId, DeploymentPolicy 
deploymentPolicy,
-                          AutoscalePolicy autoscalePolicy) {
-        this.clusterId = clusterId;
-        this.serviceId = serviceId;
-
-        this.autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
-        this.scaleCheckKnowledgeSession = 
autoscalerRuleEvaluator.getScaleCheckStatefulSession();
-        this.minCheckKnowledgeSession = 
autoscalerRuleEvaluator.getMinCheckStatefulSession();
-
-        this.deploymentPolicy = deploymentPolicy;
-        this.autoscalePolicy = autoscalePolicy;
-        networkPartitionCtxts = new ConcurrentHashMap<String, 
NetworkPartitionContext>();
-    }
-
-
-
-    @Override
-    public void run() {
-
-        try {
-            // TODO make this configurable,
-            // this is the delay the min check of normal cluster monitor to 
wait until LB monitor is added
-            Thread.sleep(60000);
-        } catch (InterruptedException ignore) {
-        }
-
-        while (!isDestroyed()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Cluster monitor is running.. " + this.toString());
-            }
-            try {
-                if(!ClusterStatus.In_Maintenance.equals(status)) {
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Cluster monitor is suspended as the cluster 
is in " +
-                                    ClusterStatus.In_Maintenance + " 
mode......");
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Cluster monitor: Monitor failed." + 
this.toString(), e);
-            }
-            try {
-                Thread.sleep(monitorInterval);
-            } catch (InterruptedException ignore) {
-            }
-        }
-    }
-
-    private boolean isPrimaryMember(MemberContext memberContext){
-        Properties props = memberContext.getProperties();
-        if (log.isDebugEnabled()) {
-            log.debug(" Properties [" + props + "] ");
-        }
-        if (props != null && props.getProperties() != null) {
-            for (Property prop : props.getProperties()) {
-                if (prop.getName().equals("PRIMARY")) {
-                    if (Boolean.parseBoolean(prop.getValue())) {
-                        log.debug("Adding member id [" + 
memberContext.getMemberId() + "] " +
-                                "member instance id [" + 
memberContext.getInstanceId() + "] as a primary member");
-                        return true;
-                    }
-                }
-            }
-        }
-        return false;
-    }
-
-    private void monitor() {
-
-        //TODO make this concurrent
-        for (NetworkPartitionContext networkPartitionContext : 
networkPartitionCtxts.values()) {
-            // store primary members in the network partition context
-            List<String> primaryMemberListInNetworkPartition = new 
ArrayList<String>();
-
-            //minimum check per partition
-            for (PartitionContext partitionContext : 
networkPartitionContext.getPartitionCtxts().values()) {
-                // store primary members in the partition context
-                List<String> primaryMemberListInPartition = new 
ArrayList<String>();
-                // get active primary members in this partition context
-                for (MemberContext memberContext : 
partitionContext.getActiveMembers()) {
-                    if (isPrimaryMember(memberContext)){
-                        
primaryMemberListInPartition.add(memberContext.getMemberId());
-                    }
-                }
-                // get pending primary members in this partition context
-                for (MemberContext memberContext : 
partitionContext.getPendingMembers()) {
-                    if (isPrimaryMember(memberContext)){
-                        
primaryMemberListInPartition.add(memberContext.getMemberId());
-                    }
-                }
-                
primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
-                minCheckKnowledgeSession.setGlobal("clusterId", clusterId);
-                minCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType);
-                minCheckKnowledgeSession.setGlobal("isPrimary", hasPrimary);
-                minCheckKnowledgeSession.setGlobal("primaryMemberCount", 
primaryMemberListInPartition.size());
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Running minimum check for 
partition %s ", partitionContext.getPartitionId()));
-                }
-
-                minCheckFactHandle = 
AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession
-                        , minCheckFactHandle, partitionContext);
-
-            }
-
-            boolean rifReset = networkPartitionContext.isRifReset();
-            boolean memoryConsumptionReset = 
networkPartitionContext.isMemoryConsumptionReset();
-            boolean loadAverageReset = 
networkPartitionContext.isLoadAverageReset();
-            if (log.isDebugEnabled()) {
-                log.debug("flag of rifReset: "  + rifReset + " flag of 
memoryConsumptionReset" + memoryConsumptionReset
-                        + " flag of loadAverageReset" + loadAverageReset);
-            }
-            if (rifReset || memoryConsumptionReset || loadAverageReset) {
-                scaleCheckKnowledgeSession.setGlobal("clusterId", clusterId);
-                //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", 
deploymentPolicy);
-                scaleCheckKnowledgeSession.setGlobal("autoscalePolicy", 
autoscalePolicy);
-                scaleCheckKnowledgeSession.setGlobal("rifReset", rifReset);
-                scaleCheckKnowledgeSession.setGlobal("mcReset", 
memoryConsumptionReset);
-                scaleCheckKnowledgeSession.setGlobal("laReset", 
loadAverageReset);
-                scaleCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType);
-                scaleCheckKnowledgeSession.setGlobal("isPrimary", false);
-                scaleCheckKnowledgeSession.setGlobal("primaryMembers", 
primaryMemberListInNetworkPartition);
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Running scale check for network 
partition %s ", networkPartitionContext.getId()));
-                    log.debug(" Primary members : " + 
primaryMemberListInNetworkPartition);
-                }
-
-                scaleCheckFactHandle = 
AutoscalerRuleEvaluator.evaluateScaleCheck(scaleCheckKnowledgeSession
-                        , scaleCheckFactHandle, networkPartitionContext);
-
-                networkPartitionContext.setRifReset(false);
-                networkPartitionContext.setMemoryConsumptionReset(false);
-                networkPartitionContext.setLoadAverageReset(false);
-            } else if (log.isDebugEnabled()) {
-                log.debug(String.format("Scale rule will not run since the LB 
statistics have not received before this " +
-                        "cycle for network partition %s", 
networkPartitionContext.getId()));
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "ClusterMonitor [clusterId=" + clusterId + ", serviceId=" + 
serviceId +
-                ", deploymentPolicy=" + deploymentPolicy + ", 
autoscalePolicy=" + autoscalePolicy +
-                ", lbReferenceType=" + lbReferenceType +
-                ", hasPrimary=" + hasPrimary + " ]";
-    }
-
-    public String getLbReferenceType() {
-        return lbReferenceType;
-    }
-
-    public void setLbReferenceType(String lbReferenceType) {
-        this.lbReferenceType = lbReferenceType;
-    }
-
-    public boolean isHasPrimary() {
-        return hasPrimary;
-    }
-
-    public void setHasPrimary(boolean hasPrimary) {
-        this.hasPrimary = hasPrimary;
-    }
-
-    public ClusterStatus getStatus() {
-        return status;
-    }
-
-    public void setStatus(ClusterStatus status) {
-        this.status = status;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
new file mode 100644
index 0000000..489078e
--- /dev/null
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
@@ -0,0 +1,336 @@
+package org.apache.stratos.autoscaler.monitor;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
+import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.partition.PartitionGroup;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.util.Constants;
+
+public class ClusterMonitorFactory {
+       
+       private static final Log log = 
LogFactory.getLog(ClusterMonitorFactory.class);
+
+       public static AbstractClusterMonitor getMonitor(Cluster cluster) throws 
PolicyValidationException, PartitionValidationException {
+               
+               AbstractClusterMonitor clusterMonitor;
+               if(cluster.isKubernetesCluster()){
+                       clusterMonitor = 
getDockerServiceClusterMonitor(cluster);
+               } else if (cluster.isLbCluster()){
+                       clusterMonitor = getVMLbClusterMonitor(cluster);
+               } else {
+                       clusterMonitor = getVMServiceClusterMonitor(cluster);
+               }
+               
+               return clusterMonitor;
+       }
+       
+    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster 
cluster) throws PolicyValidationException, PartitionValidationException {
+        // FIXME fix the following code to correctly update
+        // AutoscalerContext context = AutoscalerContext.getInstance();
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Deployment policy name: " + deploymentPolicyName);
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy =
+                                 PolicyManager.getInstance()
+                                              
.getAutoscalePolicy(autoscalePolicyName);
+        DeploymentPolicy deploymentPolicy =
+                                            PolicyManager.getInstance()
+                                                         
.getDeploymentPolicy(deploymentPolicyName);
+
+        if (deploymentPolicy == null) {
+            String msg = "Deployment Policy is null. Policy name: " + 
deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        Partition[] allPartitions = deploymentPolicy.getAllPartitions();
+        if (allPartitions == null) {
+            String msg =
+                         "Deployment Policy's Partitions are null. Policy 
name: " +
+                                 deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        
CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(),
 deploymentPolicy);
+
+        VMServiceClusterMonitor clusterMonitor =
+                                        new 
VMServiceClusterMonitor(cluster.getClusterId(),
+                                                           
cluster.getServiceName(),
+                                                           deploymentPolicy, 
policy);
+        clusterMonitor.setStatus(ClusterStatus.Created);
+        
+        for (PartitionGroup partitionGroup: 
deploymentPolicy.getPartitionGroups()){
+
+            NetworkPartitionContext networkPartitionContext = new 
NetworkPartitionContext(partitionGroup.getId(),
+                    partitionGroup.getPartitionAlgo(), 
partitionGroup.getPartitions());
+
+            for(Partition partition: partitionGroup.getPartitions()){
+                PartitionContext partitionContext = new 
PartitionContext(partition);
+                partitionContext.setServiceName(cluster.getServiceName());
+                partitionContext.setProperties(cluster.getProperties());
+                partitionContext.setNetworkPartitionId(partitionGroup.getId());
+                
+                for (Member member: cluster.getMembers()){
+                    String memberId = member.getMemberId();
+                    
if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
+                        MemberContext memberContext = new MemberContext();
+                        memberContext.setClusterId(member.getClusterId());
+                        memberContext.setMemberId(memberId);
+                        memberContext.setPartition(partition);
+                        
memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
+                        
+                        if(MemberStatus.Activated.equals(member.getStatus())){
+                            partitionContext.addActiveMember(memberContext);
+//                            
networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
 1);
+//                            
partitionContext.incrementCurrentActiveMemberCount(1);
+
+                        } else 
if(MemberStatus.Created.equals(member.getStatus()) || 
MemberStatus.Starting.equals(member.getStatus())){
+                            partitionContext.addPendingMember(memberContext);
+
+//                            
networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
 1);
+                        } else 
if(MemberStatus.Suspended.equals(member.getStatus())){
+//                            partitionContext.addFaultyMember(memberId);
+                        }
+                        partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+                        if(log.isInfoEnabled()){
+                            log.info(String.format("Member stat context has 
been added: [member] %s", memberId));
+                        }
+                    }
+
+                }
+                networkPartitionContext.addPartitionContext(partitionContext);
+                if(log.isInfoEnabled()){
+                    log.info(String.format("Partition context has been added: 
[partition] %s",
+                            partitionContext.getPartitionId()));
+                }
+            }
+
+            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+            if(log.isInfoEnabled()){
+                log.info(String.format("Network partition context has been 
added: [network partition] %s",
+                            networkPartitionContext.getId()));
+            }
+        }
+        
+        
+        // find lb reference type
+        java.util.Properties props = cluster.getProperties();
+        
+        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+            clusterMonitor.setLbReferenceType(value);
+            if(log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: "+value);
+            }
+        }
+        
+        // set hasPrimary property
+        // hasPrimary is true if there are primary members available in that 
cluster
+        
clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
+
+        log.info("Cluster monitor created: "+clusterMonitor.toString());
+        return clusterMonitor;
+    }
+    
+    private static Properties convertMemberPropsToMemberContextProps(
+                       java.util.Properties properties) {
+       Properties props = new Properties();
+       for (Map.Entry<Object, Object> e : properties.entrySet()        ) {
+                       Property prop = new Property();
+                       prop.setName((String)e.getKey());
+                       prop.setValue((String)e.getValue());
+                       props.addProperties(prop);
+               }       
+               return props;
+       }
+
+
+       private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster 
cluster) throws PolicyValidationException, PartitionValidationException {
+        // FIXME fix the following code to correctly update
+        // AutoscalerContext context = AutoscalerContext.getInstance();
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Deployment policy name: " + deploymentPolicyName);
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy =
+                                 PolicyManager.getInstance()
+                                              
.getAutoscalePolicy(autoscalePolicyName);
+        DeploymentPolicy deploymentPolicy =
+                                            PolicyManager.getInstance()
+                                                         
.getDeploymentPolicy(deploymentPolicyName);
+
+        if (deploymentPolicy == null) {
+            String msg = "Deployment Policy is null. Policy name: " + 
deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        String clusterId = cluster.getClusterId();
+        VMLbClusterMonitor clusterMonitor =
+                                        new VMLbClusterMonitor(clusterId,
+                                                           
cluster.getServiceName(),
+                                                           deploymentPolicy, 
policy);
+        clusterMonitor.setStatus(ClusterStatus.Created);
+        // partition group = network partition context
+        for (PartitionGroup partitionGroup : 
deploymentPolicy.getPartitionGroups()) {
+
+            NetworkPartitionLbHolder networkPartitionLbHolder =
+                                                              
PartitionManager.getInstance()
+                                                                              
.getNetworkPartitionLbHolder(partitionGroup.getId());
+//                                                              
PartitionManager.getInstance()
+//                                                                             
 .getNetworkPartitionLbHolder(partitionGroup.getId());
+            // FIXME pick a random partition
+            Partition partition =
+                                  partitionGroup.getPartitions()[new 
Random().nextInt(partitionGroup.getPartitions().length)];
+            PartitionContext partitionContext = new 
PartitionContext(partition);
+            partitionContext.setServiceName(cluster.getServiceName());
+            partitionContext.setProperties(cluster.getProperties());
+            partitionContext.setNetworkPartitionId(partitionGroup.getId());
+            partitionContext.setMinimumMemberCount(1);//Here it hard codes the 
minimum value as one for LB cartridge partitions
+
+            NetworkPartitionContext networkPartitionContext = new 
NetworkPartitionContext(partitionGroup.getId(),
+                    partitionGroup.getPartitionAlgo(), 
partitionGroup.getPartitions()) ;
+            for (Member member : cluster.getMembers()) {
+                String memberId = member.getMemberId();
+                if 
(member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId()))
 {
+                    MemberContext memberContext = new MemberContext();
+                    memberContext.setClusterId(member.getClusterId());
+                    memberContext.setMemberId(memberId);
+                    memberContext.setPartition(partition);
+
+                    if (MemberStatus.Activated.equals(member.getStatus())) {
+                        partitionContext.addActiveMember(memberContext);
+//                        
networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
 1);
+//                        
partitionContext.incrementCurrentActiveMemberCount(1);
+                    } else if (MemberStatus.Created.equals(member.getStatus()) 
||
+                               
MemberStatus.Starting.equals(member.getStatus())) {
+                        partitionContext.addPendingMember(memberContext);
+//                        
networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
 1);
+                    } else if 
(MemberStatus.Suspended.equals(member.getStatus())) {
+//                        partitionContext.addFaultyMember(memberId);
+                    }
+
+                    partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+                    if(log.isInfoEnabled()){
+                        log.info(String.format("Member stat context has been 
added: [member] %s", memberId));
+                    }
+                }
+
+            }
+            networkPartitionContext.addPartitionContext(partitionContext);
+            
+            // populate lb cluster id in network partition context.
+            java.util.Properties props = cluster.getProperties();
+
+            // get service type of load balanced cluster
+            String loadBalancedServiceType = 
props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
+            
+            if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+                String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+                
+                if 
(value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER))
 {
+                    networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
+
+                } else if 
(value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER))
 {
+                    String serviceName = cluster.getServiceName();
+                    // TODO: check if this is correct
+                    networkPartitionLbHolder.addServiceLB(serviceName, 
clusterId);
+
+                    if (loadBalancedServiceType != null && 
!loadBalancedServiceType.isEmpty()) {
+                        
networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Added cluster id " + clusterId + " as 
the LB cluster id for service type " + loadBalancedServiceType);
+                        }
+                    }
+                }
+            }
+
+            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+        }
+
+        log.info("LB Cluster monitor created: "+clusterMonitor.toString());
+        return clusterMonitor;
+    }
+       
+    private static DockerServiceClusterMonitor 
getDockerServiceClusterMonitor(Cluster cluster) {
+
+       if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        if (log.isDebugEnabled()) {
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy = 
PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
+        java.util.Properties props = cluster.getProperties();
+        String kubernetesHostClusterID = 
props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
+               KubernetesClusterContext kubernetesClusterCtxt = new 
KubernetesClusterContext(kubernetesHostClusterID);
+
+        DockerServiceClusterMonitor dockerClusterMonitor = new 
DockerServiceClusterMonitor(
+                       kubernetesClusterCtxt, 
+                       cluster.getClusterId(), 
+                       cluster.getServiceName(), 
+                       policy);
+                                        
+        dockerClusterMonitor.setStatus(ClusterStatus.Created);
+        
+        // find lb reference type
+        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+            dockerClusterMonitor.setLbReferenceType(value);
+            if(log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: "+value);
+            }
+        }
+        
+//        // set hasPrimary property
+//        // hasPrimary is true if there are primary members available in that 
cluster
+//        
dockerClusterMonitor.setHasPrimary(Boolean.parseBoolean(props.getProperty(Constants.IS_PRIMARY)));
+
+        log.info("Docker cluster monitor created: "+ 
dockerClusterMonitor.toString());
+        return dockerClusterMonitor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
new file mode 100644
index 0000000..f9b9047
--- /dev/null
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
@@ -0,0 +1,38 @@
+package org.apache.stratos.autoscaler.monitor;
+
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.common.enums.ClusterType;
+
+public abstract class ContainerClusterMonitor extends AbstractClusterMonitor {
+
+       private KubernetesClusterContext kubernetesClusterCtxt;
+       protected AutoscalePolicy autoscalePolicy;
+       
+       protected ContainerClusterMonitor(String clusterId, String serviceId, 
ClusterType clusterType, 
+                       KubernetesClusterContext kubernetesClusterContext,
+                       AutoscalerRuleEvaluator autoscalerRuleEvaluator, 
AutoscalePolicy autoscalePolicy){
+               
+               super(clusterId, serviceId, clusterType, 
autoscalerRuleEvaluator);
+               this.kubernetesClusterCtxt = kubernetesClusterContext;
+               this.autoscalePolicy = autoscalePolicy;
+       }
+    
+       public KubernetesClusterContext getKubernetesClusterCtxt() {
+               return kubernetesClusterCtxt;
+       }
+
+       public void setKubernetesClusterCtxt(
+                       KubernetesClusterContext kubernetesClusterCtxt) {
+               this.kubernetesClusterCtxt = kubernetesClusterCtxt;
+       }
+       
+       public AutoscalePolicy getAutoscalePolicy() {
+               return autoscalePolicy;
+       }
+
+       public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
+               this.autoscalePolicy = autoscalePolicy;
+       }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
new file mode 100644
index 0000000..ca39b6a
--- /dev/null
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
@@ -0,0 +1,156 @@
+package org.apache.stratos.autoscaler.monitor;
+
+import java.util.Properties;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.common.enums.ClusterType;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{
+       
+       private static final Log log = 
LogFactory.getLog(DockerServiceClusterMonitor.class);
+
+       private String lbReferenceType;
+    private int numberOfReplicasInServiceCluster = 0;
+       int retryInterval = 60000;
+       
+    public DockerServiceClusterMonitor(KubernetesClusterContext 
kubernetesClusterCtxt, 
+               String serviceClusterID, String serviceId, AutoscalePolicy 
autoscalePolicy) {
+       super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, 
kubernetesClusterCtxt,
+                       new AutoscalerRuleEvaluator(), autoscalePolicy);
+        readConfigurations();
+    }
+
+       @Override
+       public void run() {
+               try {
+                       // TODO make this configurable,
+                       // this is the delay the min check of normal cluster 
monitor to wait
+                       // until LB monitor is added
+                       Thread.sleep(60000);
+               } catch (InterruptedException ignore) {
+               }
+
+               while (!isDestroyed()) {
+                       if (log.isDebugEnabled()) {
+                               log.debug("Kubernetes cluster monitor is 
running.. " + this.toString());
+                       }
+                       try {
+                               if 
(!ClusterStatus.In_Maintenance.equals(getStatus())) {
+                                       monitor();
+                               } else {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug("Kubernetes cluster 
monitor is suspended as the cluster is in "
+                                                               + 
ClusterStatus.In_Maintenance + " mode......");
+                                       }
+                               }
+                       } catch (Exception e) {
+                               log.error("Kubernetes cluster monitor: Monitor 
failed." + this.toString(),
+                                               e);
+                       }
+                       try {
+                               Thread.sleep(getMonitorInterval());
+                       } catch (InterruptedException ignore) {
+                       }
+               }
+       }
+       
+       @Override
+       protected void monitor() {
+               
+           // is container created successfully?
+               boolean success = false;
+               String kubernetesClusterId = 
getKubernetesClusterCtxt().getKubernetesClusterID();
+               
+               try {
+                       TopologyManager.acquireReadLock();
+                       Properties props = 
TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties();
+                       int minReplicas = 
Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
+                       
+                       int nonTerminatedMembers = 
getKubernetesClusterCtxt().getActiveMembers().size() + 
getKubernetesClusterCtxt().getPendingMembers().size();
+
+                       if (nonTerminatedMembers == 0) {
+                               
+                               while (success) {
+                                       try {
+
+                                               MemberContext memberContext = 
CloudControllerClient.getInstance().createContainer(kubernetesClusterId, 
getClusterId());
+                                               if(null != memberContext) {
+                                                       
getKubernetesClusterCtxt().addPendingMember(memberContext);
+                                                       success = true;
+                                                       
numberOfReplicasInServiceCluster = minReplicas;
+                                                       
if(log.isDebugEnabled()){
+                                                               
log.debug(String.format("Pending member added, [member] %s [kub cluster] %s", 
+                                                                               
memberContext.getMemberId(), 
getKubernetesClusterCtxt().getKubernetesClusterID()));
+                                                       }
+                                               } else {
+                                                       if 
(log.isDebugEnabled()) {
+                                                               
log.debug("Returned member context is null, did not add to pending members");
+                                                       }
+                                               }
+                                       } catch (Throwable e) {
+                                               if (log.isDebugEnabled()) {
+                                                       String message = 
"Cannot create a container, will retry in "+(retryInterval/1000)+"s";
+                                                       log.debug(message, e);
+                                               }
+                                       }
+                                       
+                       try {
+                           Thread.sleep(retryInterval);
+                       } catch (InterruptedException e1) {
+                       }
+                               }
+                       }
+               } finally {
+                       TopologyManager.releaseReadLock();
+               }
+       }
+       
+       @Override
+       public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        if(log.isDebugEnabled()) {
+            log.debug("DockerClusterMonitor Drools session has been disposed. 
"+this.toString());
+        }              
+       }
+       
+    @Override
+    protected void readConfigurations () {
+       // same as VM cluster monitor interval
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = 
conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
+        setMonitorInterval(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("Kubernetes Cluster Monitor task interval: " + 
getMonitorInterval());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "DockerClusterMonitor "
+                       + "[ kubernetesHostClusterId=" + 
getKubernetesClusterCtxt().getKubernetesClusterID()
+                       + ", clusterId=" + getClusterId() 
+                       + ", serviceId=" + getServiceId() + "]";
+    }
+    
+       public String getLbReferenceType() {
+               return lbReferenceType;
+       }
+
+       public void setLbReferenceType(String lbReferenceType) {
+               this.lbReferenceType = lbReferenceType;
+       }
+}
\ No newline at end of file

Reply via email to