http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 75f7a2d..a8c90b3 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.context.AutoscalerContext;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
@@ -172,8 +172,8 @@ public class AutoscalerHealthStatEventReceiver {
                     }
                     return;
                 }
-                if(monitor instanceof VMClusterMonitor) {
-                    VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) 
monitor;
+                if(monitor instanceof ClusterMonitor) {
+                    ClusterMonitor vmClusterMonitor = (ClusterMonitor) monitor;
                     
vmClusterMonitor.handleAverageRequestsServingCapabilityEvent(averageRequestsServingCapabilityEvent);
                 }
             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 2a6f945..f5875ce 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import org.apache.stratos.autoscaler.context.AutoscalerContext;
 import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContext;
 import 
org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
 import 
org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
 import 
org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
@@ -33,7 +33,7 @@ import 
org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce
 import 
org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
 import org.apache.stratos.autoscaler.monitor.MonitorFactory;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
 import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
@@ -304,7 +304,7 @@ public class AutoscalerTopologyEventReceiver {
                 monitor.notifyParentMonitor(ClusterStatus.Terminated, 
instanceId);
                 //Removing the instance and instanceContext
                 ClusterInstance instance = (ClusterInstance) 
monitor.getInstance(instanceId);
-                ((VMClusterContext)monitor.getClusterContext()).
+                ((ClusterContext)monitor.getClusterContext()).
                         
getNetworkPartitionCtxt(instance.getNetworkPartitionId()).
                         removeInstanceContext(instanceId);
                 monitor.removeInstance(instanceId);
@@ -446,8 +446,8 @@ public class AutoscalerTopologyEventReceiver {
                            Cluster cluster = 
service.getCluster(clusterInstanceCreatedEvent.getClusterId());
                             if (cluster != null) {
                                 try {
-                                    VMClusterContext clusterContext =
-                                            (VMClusterContext) 
clusterMonitor.getClusterContext();
+                                    ClusterContext clusterContext =
+                                            (ClusterContext) 
clusterMonitor.getClusterContext();
                                     if (clusterContext == null) {
                                         clusterContext = 
ClusterContextFactory.getVMClusterContext(instanceId, cluster,
                                                 
clusterMonitor.hasScalingDependents());
@@ -470,7 +470,7 @@ public class AutoscalerTopologyEventReceiver {
                                                 + 
clusterInstanceCreatedEvent.getClusterId() + " started successfully");
                                     } else {
                                         //monitor already started. Invoking it 
directly to speed up the process
-                                        
((VMClusterMonitor)clusterMonitor).monitor();
+                                        
((ClusterMonitor)clusterMonitor).monitor();
                                     }
                                 } catch (PolicyValidationException e) {
                                     log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
index a8721a6..b5b1614 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
@@ -31,7 +31,7 @@ import 
org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce
 import 
org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.component.GroupMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ParentComponentMonitor;
@@ -270,7 +270,7 @@ public class MonitorFactory {
             }
 
             //Creating the instance of the cluster
-            ((VMClusterMonitor) 
clusterMonitor).createClusterInstance(parentInstanceIds, cluster);
+            ((ClusterMonitor) 
clusterMonitor).createClusterInstance(parentInstanceIds, cluster);
             //add it to autoscaler context
             AutoscalerContext.getInstance().addClusterMonitor(clusterMonitor);
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
new file mode 100644
index 0000000..24b8f3a
--- /dev/null
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -0,0 +1,1244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.client.CloudControllerClient;
+import org.apache.stratos.autoscaler.context.InstanceContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
+import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContext;
+import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
+import 
org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
+import 
org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
+import 
org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
+import 
org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
+import 
org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
+import 
org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.ScalingOverMaxEvent;
+import 
org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import 
org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
+import 
org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
+import 
org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
+import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
+import org.apache.stratos.common.Properties;
+import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.ClusterInstance;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.domain.instance.Instance;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.*;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.*;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+public class ClusterMonitor extends AbstractClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(ClusterMonitor.class);
+    private Map<String, ClusterLevelNetworkPartitionContext> 
networkPartitionIdToClusterLevelNetworkPartitionCtxts;
+    private boolean hasPrimary;
+    private float scalingFactorBasedOnDependencies = 1.0f;
+
+
+    protected ClusterMonitor(Cluster cluster, boolean hasScalingDependents, 
boolean groupScalingEnabledSubtree) {
+        super(cluster, hasScalingDependents, groupScalingEnabledSubtree);
+        this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new 
HashMap<String, ClusterLevelNetworkPartitionContext>();
+        readConfigurations();
+        autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
+        
autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
+        
autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
+        
autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_MIN_CHECK_DROOL_FILE);
+        
autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
+
+        this.obsoleteCheckKnowledgeSession = 
autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
+        this.scaleCheckKnowledgeSession = 
autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
+        this.minCheckKnowledgeSession = 
autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.VM_MIN_CHECK_DROOL_FILE);
+        this.dependentScaleCheckKnowledgeSession = 
autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
+    }
+
+    private static void terminateMember(String memberId) {
+        try {
+            CloudControllerClient.getInstance().terminate(memberId);
+
+        } catch (TerminationException e) {
+            log.error("Unable to terminate member [member id ] " + memberId, 
e);
+        }
+    }
+
+    private static void createClusterInstance(String serviceType, String 
clusterId, String alias,
+                                              String instanceId, String 
partitionId, String networkPartitionId) {
+        CloudControllerClient.getInstance().createClusterInstance(serviceType, 
clusterId, alias,
+                instanceId, partitionId, networkPartitionId);
+    }
+
+    public void 
addClusterLevelNWPartitionContext(ClusterLevelNetworkPartitionContext 
clusterLevelNWPartitionCtxt) {
+        
networkPartitionIdToClusterLevelNetworkPartitionCtxts.put(clusterLevelNWPartitionCtxt.getId(),
 clusterLevelNWPartitionCtxt);
+    }
+
+    public ClusterLevelNetworkPartitionContext 
getClusterLevelNWPartitionContext(String nwPartitionId) {
+        return 
networkPartitionIdToClusterLevelNetworkPartitionCtxts.get(nwPartitionId);
+    }
+
+    @Override
+    public void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent) {
+
+        String networkPartitionId = 
averageLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = averageLoadAverageEvent.getClusterId();
+        String instanceId = averageLoadAverageEvent.getInstanceId();
+        float value = averageLoadAverageEvent.getValue();
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg load avg event: [cluster] %s 
[network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+
+        ClusterInstanceContext clusterInstanceContext = 
getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        if (null != clusterInstanceContext) {
+            clusterInstanceContext.setAverageLoadAverage(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!isDestroyed()) {
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Cluster monitor is running.. " + 
this.toString());
+                }
+                monitor();
+            } catch (Exception e) {
+                log.error("Cluster monitor: Monitor failed." + 
this.toString(), e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
+            }
+        }
+
+
+    }
+
+    private boolean isPrimaryMember(MemberContext memberContext) {
+        Properties props = 
AutoscalerUtil.toCommonProperties(memberContext.getProperties());
+        if (log.isDebugEnabled()) {
+            log.debug(" Properties [" + props + "] ");
+        }
+        if (props != null && props.getProperties() != null) {
+            for (Property prop : props.getProperties()) {
+                if (prop.getName().equals("PRIMARY")) {
+                    if (Boolean.parseBoolean(prop.getValue())) {
+                        log.debug("Adding member id [" + 
memberContext.getMemberId() + "] " +
+                                "member instance id [" + 
memberContext.getInstanceId() + "] as a primary member");
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    public synchronized void monitor() {
+
+        for (ClusterLevelNetworkPartitionContext networkPartitionContext : 
getNetworkPartitionCtxts()) {
+
+            final Collection<InstanceContext> clusterInstanceContexts = 
networkPartitionContext.
+                    getInstanceIdToInstanceContextMap().values();
+
+            for (final InstanceContext pInstanceContext : 
clusterInstanceContexts) {
+                final ClusterInstanceContext instanceContext = 
(ClusterInstanceContext) pInstanceContext;
+                ClusterInstance instance = (ClusterInstance) 
this.instanceIdToInstanceMap.
+                        get(instanceContext.getId());
+
+                if ((instance.getStatus().getCode() <= 
ClusterStatus.Active.getCode()) ||
+                        (instance.getStatus() == ClusterStatus.Inactive && 
!hasStartupDependents)
+                                && !this.hasFaultyMember) {
+
+                    Runnable monitoringRunnable = new Runnable() {
+                        @Override
+                        public void run() {
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Monitor is running for [cluster] : 
" + getClusterId());
+                            }
+                            // store primary members in the cluster instance 
context
+                            List<String> primaryMemberListInClusterInstance = 
new ArrayList<String>();
+
+                            for (ClusterLevelPartitionContext partitionContext 
:
+                                                            
instanceContext.getPartitionCtxts()) {
+
+                                // get active primary members in this cluster 
instance context
+                                for (MemberContext memberContext : 
partitionContext.getActiveMembers()) {
+                                    if (isPrimaryMember(memberContext)) {
+                                        
primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                                    }
+                                }
+
+                                // get pending primary members in this cluster 
instance context
+                                for (MemberContext memberContext : 
partitionContext.getPendingMembers()) {
+                                    if (isPrimaryMember(memberContext)) {
+                                        
primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                                    }
+                                }
+
+                                obsoleteCheckFactHandle = 
AutoscalerRuleEvaluator.evaluate(
+                                        getObsoleteCheckKnowledgeSession(), 
obsoleteCheckFactHandle, partitionContext);
+
+                            }
+
+                            
getScaleCheckKnowledgeSession().setGlobal("primaryMembers", 
primaryMemberListInClusterInstance);
+                            
getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                            
getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                            //FIXME when parent chosen the partition
+                            String paritionAlgo = 
instanceContext.getPartitionAlgorithm();
+
+                            
getMinCheckKnowledgeSession().setGlobal("algorithmName",
+                                    paritionAlgo);
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Running minimum check 
for cluster instance %s ",
+                                        instanceContext.getId() + " for the 
cluster: " + clusterId));
+                            }
+
+                            minCheckFactHandle = 
AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
+                                    minCheckFactHandle, instanceContext);
+
+
+                            //checking the status of the cluster
+                            boolean rifReset = instanceContext.isRifReset();
+                            boolean memoryConsumptionReset = 
instanceContext.isMemoryConsumptionReset();
+                            boolean loadAverageReset = 
instanceContext.isLoadAverageReset();
+                            boolean averageRequestServedPerInstanceReset
+                                    = 
instanceContext.isAverageRequestServedPerInstanceReset();
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Execution point of scaling Rule, 
[Is rif Reset] : " + rifReset
+                                        + " [Is memoryConsumption Reset] : " + 
memoryConsumptionReset
+                                        + " [Is loadAverage Reset] : " + 
loadAverageReset);
+                            }
+
+                            if (rifReset || memoryConsumptionReset || 
loadAverageReset) {
+
+                                log.info("Executing scaling rule as statistics 
have been reset");
+                                ClusterContext vmClusterContext = 
(ClusterContext) clusterContext;
+
+                                
getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                                
getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+                                
getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+                                
getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+                                
getScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                                
getScaleCheckKnowledgeSession().setGlobal("algorithmName", paritionAlgo);
+                                
getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
+                                        vmClusterContext.getAutoscalePolicy());
+                                
getScaleCheckKnowledgeSession().setGlobal("arspiReset",
+                                        averageRequestServedPerInstanceReset);
+                                
getScaleCheckKnowledgeSession().setGlobal("primaryMembers",
+                                        primaryMemberListInClusterInstance);
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug(String.format("Running scale 
check for [cluster instance context] %s ",
+                                            instanceContext.getId()));
+                                    log.debug(" Primary members : " + 
primaryMemberListInClusterInstance);
+                                }
+
+                                scaleCheckFactHandle = 
AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
+                                        , scaleCheckFactHandle, 
instanceContext);
+
+                                instanceContext.setRifReset(false);
+                                
instanceContext.setMemoryConsumptionReset(false);
+                                instanceContext.setLoadAverageReset(false);
+                            } else if (log.isDebugEnabled()) {
+                                log.debug(String.format("Scale rule will not 
run since the LB statistics have not " +
+                                                "received before this cycle 
for [cluster instance context] %s [cluster] %s",
+                                        instanceContext.getId(), clusterId));
+                            }
+
+                        }
+                    };
+                    monitoringRunnable.run();
+                }
+
+                for (final ClusterLevelPartitionContext partitionContext : 
instanceContext.getPartitionCtxts()) {
+                    Runnable monitoringRunnable = new Runnable() {
+                        @Override
+                        public void run() {
+                            obsoleteCheckFactHandle = 
AutoscalerRuleEvaluator.evaluate(
+                                    getObsoleteCheckKnowledgeSession(), 
obsoleteCheckFactHandle, partitionContext);
+                        }
+                    };
+
+                    monitoringRunnable.run();
+
+                }
+
+            }
+        }
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = 
conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("ClusterMonitor task interval set to : " + 
getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getObsoleteCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        stopScheduler();
+        if (log.isDebugEnabled()) {
+            log.debug("ClusterMonitor Drools session has been disposed. " + 
this.toString());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterMonitor [clusterId=" + getClusterId() +
+                ", hasPrimary=" + hasPrimary + " ]";
+    }
+
+    public boolean isHasPrimary() {
+        return hasPrimary;
+    }
+
+    public void setHasPrimary(boolean hasPrimary) {
+        this.hasPrimary = hasPrimary;
+    }
+
+    @Override
+    public void onChildStatusEvent(MonitorStatusEvent statusEvent) {
+
+    }
+
+    @Override
+    public void onParentStatusEvent(MonitorStatusEvent statusEvent) {
+        String instanceId = statusEvent.getInstanceId();
+        // send the ClusterTerminating event
+        if (statusEvent.getStatus() == GroupStatus.Terminating || 
statusEvent.getStatus() ==
+                ApplicationStatus.Terminating) {
+            if (log.isInfoEnabled()) {
+                log.info("Publishing Cluster terminating event for 
[application] " + appId +
+                        " [cluster] " + this.getClusterId() + " [instance] " + 
instanceId);
+            }
+            
ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), 
getServiceId(), getClusterId(), instanceId);
+        }
+    }
+
+    @Override
+    public void onChildScalingEvent(ScalingEvent scalingEvent) {
+
+    }
+
+    @Override
+    public void onChildScalingOverMaxEvent(ScalingOverMaxEvent 
scalingOverMaxEvent) {
+
+    }
+
+    @Override
+    public void onParentScalingEvent(ScalingEvent scalingEvent) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Parent scaling event received to [cluster]: " + 
this.getClusterId()
+                    + ", [network partition]: " + 
scalingEvent.getNetworkPartitionId()
+                    + ", [event] " + scalingEvent.getId() + ", [group 
instance] " + scalingEvent.getInstanceId());
+        }
+
+        this.scalingFactorBasedOnDependencies = scalingEvent.getFactor();
+        ClusterContext vmClusterContext = (ClusterContext) clusterContext;
+        String instanceId = scalingEvent.getInstanceId();
+
+        ClusterInstanceContext clusterInstanceContext =
+                
getClusterInstanceContext(scalingEvent.getNetworkPartitionId(), instanceId);
+
+
+        // store primary members in the cluster instance context
+        List<String> primaryMemberListInClusterInstance = new 
ArrayList<String>();
+
+        for (ClusterLevelPartitionContext partitionContext : 
clusterInstanceContext.getPartitionCtxts()) {
+
+            // get active primary members in this cluster instance context
+            for (MemberContext memberContext : 
partitionContext.getActiveMembers()) {
+                if (isPrimaryMember(memberContext)) {
+                    
primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                }
+            }
+
+            // get pending primary members in this cluster instance context
+            for (MemberContext memberContext : 
partitionContext.getPendingMembers()) {
+                if (isPrimaryMember(memberContext)) {
+                    
primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                }
+            }
+        }
+
+
+        //TODO get min instance count from instance context
+        float requiredInstanceCount = 
clusterInstanceContext.getMinInstanceCount() * scalingFactorBasedOnDependencies;
+        int roundedRequiredInstanceCount = 
getRoundedInstanceCount(requiredInstanceCount,
+                
vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor());
+        
clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount);
+
+        getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", 
getClusterId());
+        
getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount",
 roundedRequiredInstanceCount);
+        getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", 
clusterInstanceContext.getPartitionAlgorithm());
+        getDependentScaleCheckKnowledgeSession().setGlobal("isPrimary", 
hasPrimary);
+
+        dependentScaleCheckFactHandle = 
AutoscalerRuleEvaluator.evaluate(getDependentScaleCheckKnowledgeSession()
+                , dependentScaleCheckFactHandle, clusterInstanceContext);
+
+    }
+
+    public void sendClusterScalingEvent(String networkPartitionId, String 
instanceId, float factor) {
+
+        MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, 
networkPartitionId, instanceId, factor, this.id);
+    }
+
+    public void sendScalingOverMaxEvent(String networkPartitionId, String 
instanceId) {
+
+        MonitorStatusEventBuilder.handleScalingOverMaxEvent(this.parent, 
networkPartitionId, instanceId,
+                this.id);
+    }
+
+    @Override
+    public void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+        String networkPartitionId = 
gradientOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = gradientOfLoadAverageEvent.getClusterId();
+        String instanceId = gradientOfLoadAverageEvent.getInstanceId();
+        float value = gradientOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of load avg event: [cluster] %s 
[network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setLoadAverageGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent 
secondDerivativeOfLoadAverageEvent) {
+
+        String networkPartitionId = 
secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+        String instanceId = secondDerivativeOfLoadAverageEvent.getInstanceId();
+        float value = secondDerivativeOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of load avg event: 
[cluster] %s "
+                    + "[network-partition] %s [value] %s", clusterId, 
networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setLoadAverageSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+        String networkPartitionId = 
averageMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = averageMemoryConsumptionEvent.getClusterId();
+        String instanceId = averageMemoryConsumptionEvent.getInstanceId();
+        float value = averageMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg Memory Consumption event: [cluster] 
%s [network-partition] %s "
+                    + "[value] %s", clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setAverageMemoryConsumption(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                        .format("Network partition context is not available 
for :"
+                                + " [network partition] %s", 
networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) 
{
+
+        String networkPartitionId = 
gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+        String instanceId = gradientOfMemoryConsumptionEvent.getInstanceId();
+        float value = gradientOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of Memory Consumption event: 
[cluster] %s "
+                    + "[network-partition] %s [value] %s", clusterId, 
networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setMemoryConsumptionGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent 
secondDerivativeOfMemoryConsumptionEvent) {
+
+        String networkPartitionId = 
secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = 
secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+        String instanceId = 
secondDerivativeOfMemoryConsumptionEvent.getInstanceId();
+        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of Memory Consumption 
event: [cluster] %s "
+                    + "[network-partition] %s [value] %s", clusterId, 
networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setMemoryConsumptionSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    public void handleAverageRequestsServingCapabilityEvent(
+            AverageRequestsServingCapabilityEvent 
averageRequestsServingCapabilityEvent) {
+
+        String clusterId = 
averageRequestsServingCapabilityEvent.getClusterId();
+        String instanceId = 
averageRequestsServingCapabilityEvent.getInstanceId();
+        String networkPartitionId = 
averageRequestsServingCapabilityEvent.getNetworkPartitionId();
+        Float floatValue = averageRequestsServingCapabilityEvent.getValue();
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Requests Served per Instance 
event: [cluster] %s [network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, floatValue));
+        }
+
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setAverageRequestsServedPerInstance(floatValue);
+
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+
+    }
+
+    @Override
+    public void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+        String networkPartitionId = 
averageRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = averageRequestsInFlightEvent.getClusterId();
+        String instanceId = averageRequestsInFlightEvent.getInstanceId();
+        Float servedCount = averageRequestsInFlightEvent.getServedCount();
+        Float activeInstances = 
averageRequestsInFlightEvent.getActiveInstances();
+        Float requestsServedPerInstance = servedCount / activeInstances;
+        if (requestsServedPerInstance.isInfinite()) {
+            requestsServedPerInstance = 0f;
+        }
+        float value = averageRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Rif event: [cluster] %s 
[network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setAverageRequestsInFlight(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+        String networkPartitionId = 
gradientOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+        String instanceId = gradientOfRequestsInFlightEvent.getInstanceId();
+        float value = gradientOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Gradient of Rif event: [cluster] %s 
[network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setRequestsInFlightGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent 
secondDerivativeOfRequestsInFlightEvent) {
+
+        String networkPartitionId = 
secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = 
secondDerivativeOfRequestsInFlightEvent.getClusterId();
+        String instanceId = 
secondDerivativeOfRequestsInFlightEvent.getInstanceId();
+        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second derivative of Rif event: [cluster] 
%s "
+                    + "[network-partition] %s [value] %s", clusterId, 
networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            
clusterLevelNetworkPartitionContext.setRequestsInFlightSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not 
available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent 
memberAverageMemoryConsumptionEvent) {
+
+        String instanceId = 
memberAverageMemoryConsumptionEvent.getInstanceId();
+        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = 
getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = 
networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = 
partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : 
[member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageMemoryConsumptionEvent.getValue();
+        memberStatsContext.setAverageMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent 
memberGradientOfMemoryConsumptionEvent) {
+
+        String instanceId = 
memberGradientOfMemoryConsumptionEvent.getInstanceId();
+        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = 
getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = 
networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = 
partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : 
[member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfMemoryConsumptionEvent.getValue();
+        memberStatsContext.setGradientOfMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent 
memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+    }
+
+    @Override
+    public void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+        String instanceId = memberAverageLoadAverageEvent.getInstanceId();
+        String memberId = memberAverageLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = 
getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = 
networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = 
partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : 
[member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageLoadAverageEvent.getValue();
+        memberStatsContext.setAverageLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) 
{
+
+        String instanceId = memberGradientOfLoadAverageEvent.getInstanceId();
+        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = 
getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = 
networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = 
partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : 
[member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfLoadAverageEvent.getValue();
+        memberStatsContext.setGradientOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent 
memberSecondDerivativeOfLoadAverageEvent) {
+
+        String instanceId = 
memberSecondDerivativeOfLoadAverageEvent.getInstanceId();
+        String memberId = 
memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+
+        ClusterInstanceContext networkPartitionCtxt = 
getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = 
networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = 
partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : 
[member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+        String memberId = memberFaultEvent.getMemberId();
+        String clusterId = memberFaultEvent.getClusterId();
+        Member member = getMemberByMemberId(memberId);
+        String instanceId = memberFaultEvent.getInstanceId();
+        String networkPartitionId = memberFaultEvent.getNetworkPartitionId();
+        if (null == member) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology: 
[member] %s", memberId));
+            }
+            return;
+        }
+        if (!member.isActive()) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member activated event has not 
received for the member %s. "
+                        + "Therefore ignoring" + " the member fault health 
stat", memberId));
+            }
+            return;
+        }
+
+        ClusterInstanceContext nwPartitionCtxt;
+        nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, 
instanceId);
+        String partitionId = getPartitionOfMember(memberId);
+        ClusterLevelPartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
+        if (!partitionCtxt.activeMemberExist(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Could not find the active member in 
partition context, "
+                        + "[member] %s ", memberId));
+            }
+            return;
+        }
+
+        // move member to obsolete list
+        synchronized (this) {
+            partitionCtxt.moveMemberToObsoleteList(memberId);
+        }
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Faulty member is added to obsolete list 
and removed from the active members list: "
+                    + "[member] %s [partition] %s [cluster] %s ", memberId, 
partitionId, clusterId));
+        }
+
+        
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+                ClusterStatusInactiveProcessor.class.getName(), clusterId, 
instanceId);
+    }
+
+    @Override
+    public void handleMemberStartedEvent(
+            MemberStartedEvent memberStartedEvent) {
+
+    }
+
+    @Override
+    public void handleMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+
+        String instanceId = memberActivatedEvent.getInstanceId();
+        String clusterId = memberActivatedEvent.getClusterId();
+        String networkPartitionId = 
memberActivatedEvent.getNetworkPartitionId();
+        String partitionId = memberActivatedEvent.getPartitionId();
+        String memberId = memberActivatedEvent.getMemberId();
+        ClusterInstanceContext networkPartitionCtxt = 
getClusterInstanceContext(networkPartitionId, instanceId);
+        ClusterLevelPartitionContext clusterLevelPartitionContext;
+        clusterLevelPartitionContext = 
networkPartitionCtxt.getPartitionCtxt(partitionId);
+        clusterLevelPartitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member stat context has been added 
successfully: "
+                    + "[member] %s", memberId));
+        }
+        
clusterLevelPartitionContext.movePendingMemberToActiveMembers(memberId);
+        
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+                ClusterStatusActiveProcessor.class.getName(), clusterId, 
instanceId);
+    }
+
+    @Override
+    public void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+        String networkPartitionId = 
maintenanceModeEvent.getNetworkPartitionId();
+        String partitionId = maintenanceModeEvent.getPartitionId();
+        String memberId = maintenanceModeEvent.getMemberId();
+        String instanceId = maintenanceModeEvent.getInstanceId();
+        ClusterInstanceContext networkPartitionCtxt = 
getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext clusterMonitorPartitionContext = 
networkPartitionCtxt.
+                getPartitionCtxt(partitionId);
+        clusterMonitorPartitionContext.addMemberStatsContext(new 
MemberStatsContext(memberId));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member has been moved as pending 
termination: "
+                    + "[member] %s", memberId));
+        }
+        
clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberReadyToShutdownEvent(MemberReadyToShutdownEvent 
memberReadyToShutdownEvent) {
+
+        ClusterInstanceContext nwPartitionCtxt;
+        String networkPartitionId = 
memberReadyToShutdownEvent.getNetworkPartitionId();
+        String instanceId = memberReadyToShutdownEvent.getInstanceId();
+        nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, 
instanceId);
+
+        // start a new member in the same Partition
+        String memberId = memberReadyToShutdownEvent.getMemberId();
+        String partitionId = getPartitionOfMember(memberId);
+        ClusterLevelPartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+        try {
+            String clusterId = memberReadyToShutdownEvent.getClusterId();
+            //move member to pending termination list
+            if (partitionCtxt.getPendingTerminationMember(memberId) != null) {
+                
partitionCtxt.movePendingTerminationMemberToObsoleteMembers(memberId);
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is removed from the 
pending termination members " +
+                            "and moved to obsolete list: [member] %s " +
+                            "[partition] %s [cluster] %s ", memberId, 
partitionId, clusterId));
+                }
+            } else if (partitionCtxt.getObsoleteMember(memberId) != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is  in obsolete list: 
[member] %s " +
+                            "[partition] %s [cluster] %s ", memberId, 
partitionId, clusterId));
+                }
+            } //TODO else part
+
+            //when no more members are there to terminate Invoking it monitor 
directly
+            // to speed up the termination process
+            if (partitionCtxt.getTotalMemberCount() == 0) {
+                this.monitor();
+            }
+
+
+        } catch (Exception e) {
+            String msg = "Error processing event " + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+    }
+
+    @Override
+    public void handleMemberTerminatedEvent(
+            MemberTerminatedEvent memberTerminatedEvent) {
+
+        String networkPartitionId = 
memberTerminatedEvent.getNetworkPartitionId();
+        String memberId = memberTerminatedEvent.getMemberId();
+        String clusterId = memberTerminatedEvent.getClusterId();
+        String instanceId = memberTerminatedEvent.getInstanceId();
+        String partitionId = memberTerminatedEvent.getPartitionId();
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = 
getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        ClusterLevelPartitionContext clusterMonitorPartitionContext =
+                
clusterLevelNetworkPartitionContext.getPartitionCtxt(partitionId);
+        clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
+
+        if 
(clusterMonitorPartitionContext.removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination 
pending members list: "
+                        + "[member] %s", memberId));
+            }
+        } else if 
(clusterMonitorPartitionContext.removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending 
members list: "
+                        + "[member] %s", memberId));
+            }
+        } else if 
(clusterMonitorPartitionContext.removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is 
removed from "
+                    + "active members list: %s", memberId));
+        } else if 
(clusterMonitorPartitionContext.removeObsoleteMember(memberId)) {
+            log.warn(String.format("Obsolete member has either been terminated 
or its obsolete time out has expired and"
+                    + " it is removed from obsolete members list: %s", 
memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list 
active, "
+                    + "pending and termination pending: %s", memberId));
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member stat context has been removed 
successfully: "
+                    + "[member] %s", memberId));
+        }
+        //Checking whether the cluster state can be changed either from 
in_active to created/terminating to terminated
+        
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+                ClusterStatusTerminatedProcessor.class.getName(), clusterId, 
instanceId);
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+
+    }
+
+    @Override
+    public void handleDynamicUpdates(Properties properties) throws 
InvalidArgumentException {
+
+    }
+
+    private String getNetworkPartitionIdByMemberId(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
+                    return cluster.getMember(memberId).getNetworkPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    private Member getMemberByMemberId(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for (Service service : 
TopologyManager.getTopology().getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        } finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    public String getPartitionOfMember(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
+                    return cluster.getMember(memberId).getPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void terminateAllMembers(final String instanceId, final String 
networkPartitionId) {
+        final ClusterMonitor monitor = this;
+        Thread memberTerminator = new Thread(new Runnable() {
+            public void run() {
+
+                ClusterInstanceContext instanceContext =
+                        (ClusterInstanceContext) 
getAllNetworkPartitionCtxts().get(networkPartitionId)
+                                                                    
.getInstanceContext(instanceId);
+                boolean allMovedToObsolete = true;
+                for (ClusterLevelPartitionContext partitionContext : 
instanceContext.getPartitionCtxts()) {
+                    if (log.isInfoEnabled()) {
+                        log.info("Starting to terminate all members in cluster 
[" + getClusterId() + "] " +
+                                "Network Partition [" + 
instanceContext.getNetworkPartitionId() + "], Partition [" +
+                                partitionContext.getPartitionId() + "]");
+                    }
+                    // need to terminate active, pending and obsolete members
+                    //FIXME to traverse concurrent
+                    // active members
+                    List<String> activeMembers = new ArrayList<String>();
+                    Iterator<MemberContext> iterator = 
partitionContext.getActiveMembers().listIterator();
+                    while (iterator.hasNext()) {
+                        MemberContext activeMemberCtxt = iterator.next();
+                        activeMembers.add(activeMemberCtxt.getMemberId());
+
+                    }
+                    for (String memberId : activeMembers) {
+                        log.info("Sending instance cleanup event for the 
active member: [member-id] " + memberId);
+                        
partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+                        InstanceNotificationPublisher.getInstance().
+                                sendInstanceCleanupEventForMember(memberId);
+                    }
+                    Iterator<MemberContext> pendingIterator = 
partitionContext.getPendingMembers().listIterator();
+
+                    List<String> pendingMembers = new ArrayList<String>();
+                    while (pendingIterator.hasNext()) {
+                        MemberContext activeMemberCtxt = 
pendingIterator.next();
+                        pendingMembers.add(activeMemberCtxt.getMemberId());
+
+                    }
+                    for (String memberId : pendingMembers) {
+                        MemberContext pendingMemberCtxt = 
pendingIterator.next();
+                        // pending members
+                        String memeberId = pendingMemberCtxt.getMemberId();
+                        if (log.isDebugEnabled()) {
+                            log.debug("Moving pending member [member id] " + 
memeberId + " to obsolete list");
+                        }
+                        
partitionContext.movePendingMemberToObsoleteMembers(memeberId);
+                    }
+                    if(partitionContext.getTotalMemberCount() == 0) {
+                        allMovedToObsolete = allMovedToObsolete && true;
+                    } else {
+                        allMovedToObsolete = false;
+                    }
+                }
+
+                if(allMovedToObsolete) {
+                    monitor.monitor();
+                }
+            }
+        }, "Member Terminator - [cluster id] " + getClusterId());
+
+        memberTerminator.start();
+    }
+
+    public Map<String, ClusterLevelNetworkPartitionContext> 
getAllNetworkPartitionCtxts() {
+        return ((ClusterContext) 
this.clusterContext).getNetworkPartitionCtxts();
+    }
+
+    public ClusterInstanceContext getClusterInstanceContext(String 
networkPartitionId, String instanceId) {
+        Map<String, ClusterLevelNetworkPartitionContext> 
clusterLevelNetworkPartitionContextMap =
+                ((ClusterContext) 
this.clusterContext).getNetworkPartitionCtxts();
+        ClusterLevelNetworkPartitionContext networkPartitionContext =
+                clusterLevelNetworkPartitionContextMap.get(networkPartitionId);
+        ClusterInstanceContext instanceContext = (ClusterInstanceContext) 
networkPartitionContext.
+                                                        
getInstanceContext(instanceId);
+        return instanceContext;
+    }
+
+    public Collection<ClusterLevelNetworkPartitionContext> 
getNetworkPartitionCtxts() {
+        return ((ClusterContext) 
this.clusterContext).getNetworkPartitionCtxts().values();
+    }
+
+    public void createClusterInstance(List<String> parentInstanceIds, Cluster 
cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        for (String parentInstanceId : parentInstanceIds) {
+            createInstance(parentInstanceId, cluster);
+        }
+
+    }
+
+    public boolean createInstanceOnDemand(String instanceId) {
+        Cluster cluster = 
TopologyManager.getTopology().getService(this.serviceType).
+                getCluster(this.clusterId);
+        try {
+            return createInstance(instanceId, cluster);
+            //TODO exception
+        } catch (PolicyValidationException e) {
+            log.error("Error while creating the cluster instance", e);
+        } catch (PartitionValidationException e) {
+            log.error("Error while creating the cluster instance", e);
+
+        }
+        return false;
+
+    }
+
+    private boolean createInstance(String parentInstanceId, Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        Instance parentMonitorInstance = 
this.parent.getInstance(parentInstanceId);
+        String partitionId = null;
+        if (parentMonitorInstance instanceof GroupInstance) {
+            partitionId = parentMonitorInstance.getPartitionId();
+        }
+        if (parentMonitorInstance != null) {
+
+            ClusterInstance clusterInstance = 
cluster.getInstanceContexts(parentInstanceId);
+            if (clusterInstance != null) {
+
+                // Cluster instance is already there. No need to create one.
+                ClusterContext clusterContext = (ClusterContext) 
this.getClusterContext();
+                if (clusterContext == null) {
+
+                    clusterContext = 
ClusterContextFactory.getVMClusterContext(clusterInstance.getInstanceId(), 
cluster,
+                            hasScalingDependents());
+                    this.setClusterContext(clusterContext);
+                }
+
+                // create VMClusterContext and then add all the 
instanceContexts
+                clusterContext.addInstanceContext(parentInstanceId, cluster, 
hasScalingDependents(),
+                        groupScalingEnabledSubtree());
+                if (this.getInstance(clusterInstance.getInstanceId()) == null) 
{
+                    this.addInstance(clusterInstance);
+                }
+                // Checking the current status of the cluster instance
+                boolean stateChanged =
+                        
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain()
+                                .process("", cluster.getClusterId(), 
clusterInstance.getInstanceId());
+                if (!stateChanged && clusterInstance.getStatus() != 
ClusterStatus.Created) {
+                    this.notifyParentMonitor(clusterInstance.getStatus(),
+                            clusterInstance.getInstanceId());
+
+                    if (this.hasMonitoringStarted().compareAndSet(false, 
true)) {
+                        this.startScheduler();
+                        log.info("Monitoring task for Cluster Monitor with 
cluster id " +
+                                cluster.getClusterId() + " started 
successfully");
+                    }
+                }
+            } else {
+                createClusterInstance(cluster.getServiceName(), 
cluster.getClusterId(), null, parentInstanceId, partitionId,
+                        parentMonitorInstance.getNetworkPartitionId());
+
+            }
+            return true;
+
+        } else {
+            return false;
+
+        }
+
+    }
+
+    /**
+     * Move all the members of the cluster instance to termiantion pending
+     *
+     * @param instanceId
+     */
+    public void moveMembersFromActiveToPendingTermination(String instanceId) {
+
+        //TODO take read lock for network partition context
+        //FIXME to iterate properly
+        for (ClusterLevelNetworkPartitionContext networkPartitionContext :
+                ((ClusterContext) 
this.clusterContext).getNetworkPartitionCtxts().values()) {
+            ClusterInstanceContext clusterInstanceContext =
+                    (ClusterInstanceContext) 
networkPartitionContext.getInstanceContext(instanceId);
+            if (clusterInstanceContext != null) {
+                for (ClusterLevelPartitionContext partitionContext : 
clusterInstanceContext.getPartitionCtxts()) {
+                    List<String> members = new ArrayList<String>();
+
+                    Iterator<MemberContext> iterator = 
partitionContext.getActiveMembers().listIterator();
+                    while (iterator.hasNext()) {
+                        MemberContext activeMember = iterator.next();
+                        members.add(activeMember.getMemberId());
+                    }
+
+                    for (String memberId : members) {
+                        
partitionContext.moveActiveMemberToTerminationPendingMembers(
+                                memberId);
+                    }
+                    List<String> pendingMembers = new ArrayList<String>();
+
+                    Iterator<MemberContext> pendingIterator = 
partitionContext.getPendingMembers().listIterator();
+                    while (pendingIterator.hasNext()) {
+                        MemberContext activeMember = pendingIterator.next();
+                        pendingMembers.add(activeMember.getMemberId());
+                    }
+                    for (String memberId : members) {
+                        // pending members
+                        if (log.isDebugEnabled()) {
+                            log.debug("Moving pending member [member id] " + 
memberId + " the obsolete list");
+                        }
+                        
partitionContext.movePendingMemberToObsoleteMembers(memberId);
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
index e5fa765..f870e11 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -21,14 +21,9 @@ package org.apache.stratos.autoscaler.monitor.cluster;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
 import 
org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
 import 
org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
 
 /*
  * Factory class for creating cluster monitors.
@@ -47,19 +42,12 @@ public class ClusterMonitorFactory {
                                                     boolean 
groupScalingEnabledSubtree)
             throws PolicyValidationException, PartitionValidationException {
 
-        AbstractClusterMonitor clusterMonitor;
-//        if (cluster.isKubernetesCluster()) {
-//            clusterMonitor = getDockerServiceClusterMonitor(cluster);
-//////        } else if (cluster.isLbCluster()) {
-//////            clusterMonitor = getVMLbClusterMonitor(cluster);
-//        } else {
-            clusterMonitor = getVMClusterMonitor(cluster, 
hasScalingDependents, groupScalingEnabledSubtree);
-//        }
-
+        AbstractClusterMonitor clusterMonitor =
+                getVMClusterMonitor(cluster, hasScalingDependents, 
groupScalingEnabledSubtree);
         return clusterMonitor;
     }
 
-    private static VMClusterMonitor getVMClusterMonitor(Cluster cluster, 
boolean hasScalingDependents,
+    private static ClusterMonitor getVMClusterMonitor(Cluster cluster, boolean 
hasScalingDependents,
                                                         boolean 
groupScalingEnabledSubtree)
             throws PolicyValidationException, PartitionValidationException {
 
@@ -67,7 +55,7 @@ public class ClusterMonitorFactory {
             return null;
         }
 
-        VMClusterMonitor clusterMonitor = new VMClusterMonitor(cluster, 
hasScalingDependents, groupScalingEnabledSubtree);
+        ClusterMonitor clusterMonitor = new ClusterMonitor(cluster, 
hasScalingDependents, groupScalingEnabledSubtree);
 
         // find lb reference type
         java.util.Properties props = cluster.getProperties();
@@ -90,121 +78,4 @@ public class ClusterMonitorFactory {
         log.info("VMClusterMonitor created: " + clusterMonitor.toString());
         return clusterMonitor;
     }
-//
-//    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
-//            throws PolicyValidationException, PartitionValidationException {
-//
-//        if (null == cluster) {
-//            return null;
-//        }
-//
-//        VMLbClusterMonitor clusterMonitor =
-//                new VMLbClusterMonitor(cluster.getServiceName(), 
cluster.getClusterId());
-//        clusterMonitor.notifyParentMonitor(ClusterStatus.Created);
-//
-//        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
-//        return clusterMonitor;
-//    }
-
-    /**
-     * @param cluster - the cluster which needs to be monitored
-     * @return - the cluster monitor
-     */
-    private static KubernetesClusterMonitor 
getDockerServiceClusterMonitor(Cluster cluster)
-            throws PolicyValidationException {
-
-        if (null == cluster) {
-            return null;
-        }
-
-//        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-//
-//        AutoscalePolicy autoscalePolicy =
-//                PolicyManager.getInstance()
-//                        .getAutoscalePolicy(autoscalePolicyName);
-//        if (log.isDebugEnabled()) {
-//            log.debug("Autoscaling policy name: " + autoscalePolicyName);
-//        }
-//
-//        AutoscalePolicy policy = 
PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-//
-//        if (policy == null) {
-//            String msg = String.format("Autoscaling policy is null: 
[policy-name] %s", autoscalePolicyName);
-//            log.error(msg);
-//            throw new PolicyValidationException(msg);
-//        }
-//
-        java.util.Properties properties = cluster.getProperties();
-        if (properties == null) {
-            String message = String.format("Properties not found in kubernetes 
cluster: [cluster-id] %s",
-                    cluster.getClusterId());
-            log.error(message);
-            throw new RuntimeException(message);
-        }
-//        String minReplicasProperty = 
properties.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
-//        int minReplicas = 0;
-//        if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
-//            minReplicas = Integer.parseInt(minReplicasProperty);
-//        }
-//
-//        int maxReplicas = 0;
-//        String maxReplicasProperty = 
properties.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
-//        if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
-//            maxReplicas = Integer.parseInt(maxReplicasProperty);
-//        }
-//
-//        String kubernetesHostClusterID = 
properties.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-//        KubernetesClusterContext kubernetesClusterCtxt = new 
KubernetesClusterContext(kubernetesHostClusterID,
-//                cluster.getClusterId(), cluster.getServiceName(),  
autoscalePolicy, minReplicas, maxReplicas);
-
-
-//        KubernetesClusterMonitor dockerClusterMonitor = new 
KubernetesClusterMonitor(cluster);
-
-        //populate the members after restarting
-//        for (Member member : cluster.getMembers()) {
-//            String memberId = member.getMemberId();
-//            String clusterId = member.getClusterId();
-//            MemberContext memberContext = new MemberContext();
-//            memberContext.setMemberId(memberId);
-//            memberContext.setClusterId(clusterId);
-//            memberContext.setInitTime(member.getInitTime());
-//
-//            // if there is at least one member in the topology, that means 
service has been created already
-//            // this is to avoid calling startContainer() method again
-//            //kubernetesClusterCtxt.setServiceClusterCreated(true);
-//
-//            if (MemberStatus.Activated.equals(member.getStatus())) {
-//                if (log.isDebugEnabled()) {
-//                    String msg = String.format("Active member loaded from 
topology and added to active member list, %s", member.toString());
-//                    log.debug(msg);
-//                }
-//                ((VMClusterContext) 
dockerClusterMonitor.getClusterContext()).addActiveMember(memberContext);
-//            } else if (MemberStatus.Created.equals(member.getStatus())
-//                    || MemberStatus.Starting.equals(member.getStatus())) {
-//                if (log.isDebugEnabled()) {
-//                    String msg = String.format("Pending member loaded from 
topology and added to pending member list, %s", member.toString());
-//                    log.debug(msg);
-//                }
-//                
dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-//            }
-//
-//            //kubernetesClusterCtxt.addMemberStatsContext(new 
MemberStatsContext(memberId));
-//            if (log.isInfoEnabled()) {
-//                log.info(String.format("Member stat context has been added: 
[member] %s", memberId));
-//            }
-//        }
-//
-//        // find lb reference type
-//        if (properties.containsKey(StratosConstants.LOAD_BALANCER_REF)) {
-//            String value = 
properties.getProperty(StratosConstants.LOAD_BALANCER_REF);
-//            dockerClusterMonitor.setLbReferenceType(value);
-//            if (log.isDebugEnabled()) {
-//                log.debug("Set the lb reference type: " + value);
-//            }
-//        }
-
-//        log.info("KubernetesServiceClusterMonitor created: " + 
dockerClusterMonitor.toString());
-//        return dockerClusterMonitor;
-        return null;
-    }
 }

Reply via email to