cluster monitors hierarchy redesigned and docker cluster monitor improved

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d6f49d37
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d6f49d37
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d6f49d37

Branch: refs/heads/docker-integration
Commit: d6f49d37095d0c6d20cf799f6e9a0e47932608ca
Parents: 9f374fe
Author: R-Rajkumar <[email protected]>
Authored: Mon Sep 22 13:38:12 2014 +0530
Committer: Nirmal Fernando <[email protected]>
Committed: Tue Sep 23 14:48:33 2014 +0530

----------------------------------------------------------------------
 .../stratos/autoscaler/AutoscalerContext.java   | 106 +--
 .../autoscaler/KubernetesClusterContext.java    | 639 +++++++++++++++++-
 .../stratos/autoscaler/MemberStatsContext.java  |   9 -
 .../cloud/controller/CloudControllerClient.java |  26 +
 .../AutoscalerHealthStatEventReceiver.java      | 647 ++++++++++++-------
 .../AutoscalerTopologyEventReceiver.java        | 484 +++++++-------
 .../monitor/AbstractClusterMonitor.java         | 127 ++++
 .../autoscaler/monitor/AbstractMonitor.java     | 203 ------
 .../autoscaler/monitor/ClusterMonitor.java      | 223 -------
 .../monitor/ClusterMonitorFactory.java          | 336 ++++++++++
 .../monitor/ContainerClusterMonitor.java        |  38 ++
 .../monitor/DockerServiceClusterMonitor.java    | 156 +++++
 .../monitor/KubernetesClusterMonitor.java       | 186 ------
 .../autoscaler/monitor/LbClusterMonitor.java    | 126 ----
 .../autoscaler/monitor/VMClusterMonitor.java    | 120 ++++
 .../autoscaler/monitor/VMLbClusterMonitor.java  | 135 ++++
 .../monitor/VMServiceClusterMonitor.java        | 231 +++++++
 .../stratos/autoscaler/util/AutoscalerUtil.java | 574 ++++++++--------
 .../stratos/common/enums/ClusterType.java       |   5 +
 19 files changed, 2784 insertions(+), 1587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
index e3eb598..18003d8 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
@@ -20,15 +20,13 @@
  */
 package org.apache.stratos.autoscaler;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.monitor.ClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.KubernetesClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.LbClusterMonitor;
-
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+
 /**
  * This class is there for accumulating cluster details which are not there in 
Topology
  */
@@ -37,20 +35,14 @@ public class AutoscalerContext {
     private static final Log log = LogFactory.getLog(AutoscalerContext.class);
     private AutoscalerContext() {
         try {
-            setMonitors(new HashMap<String, ClusterMonitor>());
-            setLbMonitors(new HashMap<String, LbClusterMonitor>());
-            setKubernetesClusterMonitors(new HashMap<String, 
KubernetesClusterMonitor>());
+            setClusterMonitors(new HashMap<String, AbstractClusterMonitor>());
         } catch (Exception e) {
             log.error("Rule evaluateMinCheck error", e);
         }
     }
     
-    // Map<ClusterId, ClusterMonitor>
-    private Map<String, ClusterMonitor> monitors;
-    // Map<LBClusterId, LBClusterMonitor>
-    private Map<String, LbClusterMonitor> lbMonitors;
-    // Map<ClusterId, KubernetesClusterMonitor>
-    private Map<String, KubernetesClusterMonitor> kubernetesClusterMonitors;
+    // Map<ClusterId, AbstractClusterMonitor>
+    private Map<String, AbstractClusterMonitor> clusterMonitors;
 
        private static class Holder {
                private static final AutoscalerContext INSTANCE = new 
AutoscalerContext();
@@ -60,86 +52,32 @@ public class AutoscalerContext {
                return Holder.INSTANCE;
        }
 
-    public void addMonitor(ClusterMonitor monitor) {
-        monitors.put(monitor.getClusterId(), monitor);
-    }
-
-    public ClusterMonitor getMonitor(String clusterId) {
-        return monitors.get(clusterId);
-    }
-    
-    public boolean monitorExist(String clusterId) {
-        return monitors.containsKey(clusterId);
-    }
-    
-    public boolean lbMonitorExist(String clusterId) {
-        return lbMonitors.containsKey(clusterId);
-    }
-    
-    public LbClusterMonitor getLBMonitor(String clusterId) {
-        return lbMonitors.get(clusterId);
-    }
-
-    public ClusterMonitor removeMonitor(String clusterId) {
-       if(!monitorExist(clusterId)) {
-               log.fatal("Cluster monitor not found for cluster id: 
"+clusterId);
-               return null;
-       }
-       log.info("Removed monitor [cluster id]: " + clusterId);
-        return monitors.remove(clusterId);
-    }
-    public LbClusterMonitor removeLbMonitor(String clusterId) {
-       if(!lbMonitorExist(clusterId)) {
-               log.fatal("LB monitor not found for cluster id: "+clusterId);
-               return null;
-       }
-       log.info("Removed LB monitor [cluster id]: " + clusterId);
-        return lbMonitors.remove(clusterId);
-    }
-
-    public Map<String, ClusterMonitor> getMonitors() {
-        return monitors;
-    }
-
-
-    public void setMonitors(Map<String, ClusterMonitor> monitors) {
-        this.monitors = monitors;
-    }
-
-    public void setLbMonitors(Map<String, LbClusterMonitor> monitors) {
-        this.lbMonitors = monitors;
-    }
-
-    public void addLbMonitor(LbClusterMonitor monitor) {
-        lbMonitors.put(monitor.getClusterId(), monitor);
-    }
-    
-    public void addKubernetesClusterMonitor(KubernetesClusterMonitor 
kubernetesClusterMonitor) {
-        kubernetesClusterMonitors.put(kubernetesClusterMonitor.getClusterId(), 
kubernetesClusterMonitor);
+    public void addClusterMonitor(AbstractClusterMonitor clusterMonitor) {
+        clusterMonitors.put(clusterMonitor.getClusterId(), clusterMonitor);
     }
 
-    public KubernetesClusterMonitor getKubernetesClusterMonitor(String 
clusterId) {
-        return kubernetesClusterMonitors.get(clusterId);
+    public AbstractClusterMonitor getClusterMonitor(String clusterId) {
+        return clusterMonitors.get(clusterId);
     }
     
-    public boolean kubernetesClusterMonitorExist(String clusterId) {
-        return kubernetesClusterMonitors.containsKey(clusterId);
+    public boolean clusterMonitorExist(String clusterId) {
+        return clusterMonitors.containsKey(clusterId);
     }
     
-    public Map<String, KubernetesClusterMonitor> 
getKubernetesClusterMonitors() {
-        return kubernetesClusterMonitors;
+    public Map<String, AbstractClusterMonitor> getClusterMonitors() {
+        return clusterMonitors;
     }
 
-    public void setKubernetesClusterMonitors(Map<String, 
KubernetesClusterMonitor> kubernetesClusterMonitors) {
-        this.kubernetesClusterMonitors = kubernetesClusterMonitors;
+    public void setClusterMonitors(Map<String, AbstractClusterMonitor> 
clusterMonitors) {
+        this.clusterMonitors = clusterMonitors;
     }
     
-    public KubernetesClusterMonitor removeKubernetesClusterMonitor(String 
clusterId) {
-       if(!kubernetesClusterMonitorExist(clusterId)) {
-               log.fatal("Kubernetes cluster monitor not found for cluster id: 
"+clusterId);
+    public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
+       if(!clusterMonitorExist(clusterId)) {
+               log.fatal("ClusterMonitor not found for cluster id: 
"+clusterId);
                return null;
        }
-       log.info("Removed KubernetesClusterMonitor [cluster id]: " + clusterId);
-        return kubernetesClusterMonitors.remove(clusterId);
+       log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
+        return clusterMonitors.remove(clusterId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
index f369ac9..e1d5e30 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
@@ -1,20 +1,647 @@
 package org.apache.stratos.autoscaler;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.policy.model.LoadAverage;
+import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
+import org.apache.stratos.autoscaler.policy.model.RequestsInFlight;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
 
 public class KubernetesClusterContext implements Serializable{
        
        private static final long serialVersionUID = 808741789615481596L;
-       String kubernetesClusterID;
+       private static final Log log = 
LogFactory.getLog(KubernetesClusterContext.class);
+       
+       private String kubernetesClusterId;
+       private String serviceName;
+       
+    private int minReplicas;
+    private int maxReplicas;
+    private int currentReplicas = 0;
+    
+    // properties
+    private Properties properties;
+    
+    // 15 mints as the default
+    private long expiryTime = 900000;
+    // pending members
+    private List<MemberContext> pendingMembers;
+    private int pendingMembersFailureCount = 0;
+    private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5;
+    
+    // 1 day as default
+    private long obsoltedMemberExpiryTime = 1*24*60*60*1000;
+
+    // members to be terminated
+    private Map<String, MemberContext> obsoletedMembers;
+    
+    // active members
+    private List<MemberContext> activeMembers;
+
+    // termination pending members, member is added to this when Autoscaler 
send grace fully shut down event
+    private List<MemberContext> terminationPendingMembers;
+
+    //Keep statistics come from CEP
+    private Map<String, MemberStatsContext> memberStatsContexts;
        
-       public KubernetesClusterContext(String kubernetesClusterID){
-               this.kubernetesClusterID = kubernetesClusterID;
+    //Following information will keep events details
+    private RequestsInFlight requestsInFlight;
+    private MemoryConsumption memoryConsumption;
+    private LoadAverage loadAverage;
+    
+    //boolean values to keep whether the requests in flight parameters are 
reset or not
+    private boolean rifReset = false, averageRifReset = false, 
gradientRifReset = false, secondDerivativeRifRest = false;
+    //boolean values to keep whether the memory consumption parameters are 
reset or not
+    private boolean memoryConsumptionReset = false, 
averageMemoryConsumptionReset = false,
+            gradientMemoryConsumptionReset = false, 
secondDerivativeMemoryConsumptionRest = false;
+    //boolean values to keep whether the load average parameters are reset or 
not
+    private boolean loadAverageReset = false, averageLoadAverageReset = false, 
gradientLoadAverageReset = false,
+            secondDerivativeLoadAverageRest = false;
+    
+       public KubernetesClusterContext(String kubernetesClusterId){
+               this.kubernetesClusterId = kubernetesClusterId;
+        this.pendingMembers = new ArrayList<MemberContext>();
+        this.activeMembers = new ArrayList<MemberContext>();
+        this.terminationPendingMembers = new ArrayList<MemberContext>();
+        this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>();
+        this.memberStatsContexts = new ConcurrentHashMap<String, 
MemberStatsContext>();
+        this.requestsInFlight = new RequestsInFlight();
+        this.loadAverage = new LoadAverage();
+        this.memoryConsumption = new MemoryConsumption();
+        
+        // check if a different value has been set for expiryTime
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        expiryTime = conf.getLong("autoscaler.member.expiryTimeout", 900000);
+        if (log.isDebugEnabled()) {
+            log.debug("Member expiry time is set to: " + expiryTime);
+        }
+        
+        Thread th = new Thread(new PendingMemberWatcher(this));
+        th.start();
+        Thread th2 = new Thread(new ObsoletedMemberWatcher(this));
+        th2.start();
        }
        
        public String getKubernetesClusterID() {
-               return kubernetesClusterID;
+               return kubernetesClusterId;
+       }
+       public void setKubernetesClusterID(String kubernetesClusterId) {
+               this.kubernetesClusterId = kubernetesClusterId;
+       }
+       
+       public List<MemberContext> getPendingMembers() {
+               return pendingMembers;
+       }
+
+       public void setPendingMembers(List<MemberContext> pendingMembers) {
+               this.pendingMembers = pendingMembers;
+       }
+
+       public int getActiveMemberCount() {
+               return activeMembers.size();
+       }
+
+       public void setActiveMembers(List<MemberContext> activeMembers) {
+               this.activeMembers = activeMembers;
+       }
+           
+       public int getMinReplicas() {
+               return minReplicas;
+       }
+
+       public void setMinReplicas(int minReplicas) {
+               this.minReplicas = minReplicas;
+       }
+
+       public int getMaxReplicas() {
+               return maxReplicas;
+       }
+
+       public void setMaxReplicas(int maxReplicas) {
+               this.maxReplicas = maxReplicas;
+       }
+
+       public int getCurrentReplicas() {
+               return currentReplicas;
+       }
+
+       public void setCurrentReplicas(int currentReplicas) {
+               this.currentReplicas = currentReplicas;
+       }
+
+       public void addPendingMember(MemberContext ctxt) {
+               this.pendingMembers.add(ctxt);
+       }
+           
+       public boolean removePendingMember(String id) {
+               if (id == null) {
+                       return false;
+               }
+               for (Iterator<MemberContext> iterator = 
pendingMembers.iterator(); iterator.hasNext();) {
+                       MemberContext pendingMember = (MemberContext) 
iterator.next();
+                       if (id.equals(pendingMember.getMemberId())) {
+                               iterator.remove();
+                               return true;
+                       }
+
+               }
+
+               return false;
+       }
+
+       public void movePendingMemberToActiveMembers(String memberId) {
+               if (memberId == null) {
+                       return;
+               }
+               Iterator<MemberContext> iterator = 
pendingMembers.listIterator();
+               while (iterator.hasNext()) {
+                       MemberContext pendingMember = iterator.next();
+                       if (pendingMember == null) {
+                               iterator.remove();
+                               continue;
+                       }
+                       if (memberId.equals(pendingMember.getMemberId())) {
+                               // member is activated
+                               // remove from pending list
+                               iterator.remove();
+                               // add to the activated list
+                               this.activeMembers.add(pendingMember);
+                               pendingMembersFailureCount = 0;
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format(
+                                                       "Pending member is 
removed and added to the "
+                                                                       + 
"activated member list. [Member Id] %s",
+                                                       memberId));
+                               }
+                               break;
+                       }
+               }
+       }
+
+       public void moveActiveMemberToTerminationPendingMembers(String 
memberId) {
+               if (memberId == null) {
+                       return;
+               }
+               Iterator<MemberContext> iterator = activeMembers.listIterator();
+               while (iterator.hasNext()) {
+                       MemberContext activeMember = iterator.next();
+                       if (activeMember == null) {
+                               iterator.remove();
+                               continue;
+                       }
+                       if (memberId.equals(activeMember.getMemberId())) {
+                               // member is activated
+                               // remove from pending list
+                               iterator.remove();
+                               // add to the activated list
+                               
this.terminationPendingMembers.add(activeMember);
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String
+                                                       .format("Active member 
is removed and added to the "
+                                                                       + 
"termination pending member list. [Member Id] %s",
+                                                                       
memberId));
+                               }
+                               break;
+                       }
+               }
+       }
+           
+       public void addActiveMember(MemberContext ctxt) {
+               this.activeMembers.add(ctxt);
+       }
+
+       public void removeActiveMember(MemberContext ctxt) {
+               this.activeMembers.remove(ctxt);
+       }
+
+       public boolean removeTerminationPendingMember(String memberId) {
+               boolean terminationPendingMemberAvailable = false;
+               for (MemberContext memberContext : terminationPendingMembers) {
+                       if (memberContext.getMemberId().equals(memberId)) {
+                               terminationPendingMemberAvailable = true;
+                               terminationPendingMembers.remove(memberContext);
+                               break;
+                       }
+               }
+               return terminationPendingMemberAvailable;
+       }
+           
+       public long getObsoltedMemberExpiryTime() {
+               return obsoltedMemberExpiryTime;
+       }
+
+       public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) {
+               this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime;
+       }
+
+       public void addObsoleteMember(MemberContext ctxt) {
+               this.obsoletedMembers.put(ctxt.getMemberId(), ctxt);
+       }
+
+       public boolean removeObsoleteMember(String memberId) {
+               if (this.obsoletedMembers.remove(memberId) == null) {
+                       return false;
+               }
+               return true;
+       }
+
+       public long getExpiryTime() {
+               return expiryTime;
+       }
+
+       public void setExpiryTime(long expiryTime) {
+               this.expiryTime = expiryTime;
+       }
+           
+       public Map<String, MemberContext> getObsoletedMembers() {
+               return obsoletedMembers;
+       }
+
+       public void setObsoletedMembers(Map<String, MemberContext> 
obsoletedMembers) {
+               this.obsoletedMembers = obsoletedMembers;
+       }
+
+       public Map<String, MemberStatsContext> getMemberStatsContexts() {
+               return memberStatsContexts;
+       }
+
+       public MemberStatsContext getMemberStatsContext(String memberId) {
+               return memberStatsContexts.get(memberId);
+       }
+
+       public void addMemberStatsContext(MemberStatsContext ctxt) {
+               this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
+       }
+
+       public void removeMemberStatsContext(String memberId) {
+               this.memberStatsContexts.remove(memberId);
+       }
+
+       public Properties getProperties() {
+               return properties;
+       }
+
+       public void setProperties(Properties properties) {
+               this.properties = properties;
+       }
+
+       public String getServiceName() {
+               return serviceName;
+       }
+
+       public void setServiceName(String serviceName) {
+               this.serviceName = serviceName;
+       }
+
+       public List<MemberContext> getTerminationPendingMembers() {
+               return terminationPendingMembers;
+       }
+
+       public void setTerminationPendingMembers(
+                       List<MemberContext> terminationPendingMembers) {
+               this.terminationPendingMembers = terminationPendingMembers;
        }
-       public void setKubernetesClusterID(String kubernetesClusterID) {
-               this.kubernetesClusterID = kubernetesClusterID;
+
+       public int getTotalMemberCount() {
+               return activeMembers.size() + pendingMembers.size()
+                               + terminationPendingMembers.size();
+       }
+
+       public int getNonTerminatedMemberCount() {
+               return activeMembers.size() + pendingMembers.size()
+                               + terminationPendingMembers.size();
+       }
+           
+       public List<MemberContext> getActiveMembers() {
+               return activeMembers;
+       }
+
+       public boolean removeActiveMemberById(String memberId) {
+               boolean removeActiveMember = false;
+               synchronized (activeMembers) {
+                       Iterator<MemberContext> iterator = 
activeMembers.listIterator();
+                       while (iterator.hasNext()) {
+                               MemberContext memberContext = iterator.next();
+                               if 
(memberId.equals(memberContext.getMemberId())) {
+                                       iterator.remove();
+                                       removeActiveMember = true;
+
+                                       break;
+                               }
+                       }
+               }
+               return removeActiveMember;
+       }
+
+       public boolean activeMemberExist(String memberId) {
+
+               for (MemberContext memberContext : activeMembers) {
+                       if (memberId.equals(memberContext.getMemberId())) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       private class PendingMemberWatcher implements Runnable {
+               private KubernetesClusterContext ctxt;
+
+               public PendingMemberWatcher(KubernetesClusterContext ctxt) {
+                       this.ctxt = ctxt;
+               }
+
+               @Override
+               public void run() {
+
+                       while (true) {
+                               long expiryTime = ctxt.getExpiryTime();
+                               List<MemberContext> pendingMembers = 
ctxt.getPendingMembers();
+
+                               synchronized (pendingMembers) {
+                                       Iterator<MemberContext> iterator = 
pendingMembers
+                                                       .listIterator();
+                                       while (iterator.hasNext()) {
+                                               MemberContext pendingMember = 
iterator.next();
+
+                                               if (pendingMember == null) {
+                                                       continue;
+                                               }
+                                               long pendingTime = 
System.currentTimeMillis()
+                                                               - 
pendingMember.getInitTime();
+                                               if (pendingTime >= expiryTime) {
+
+                                                       iterator.remove();
+                                                       log.info("Pending state 
of member: "
+                                                                       + 
pendingMember.getMemberId()
+                                                                       + " is 
expired. "
+                                                                       + 
"Adding as an obsoleted member.");
+                                                       // member should be 
terminated
+                                                       
ctxt.addObsoleteMember(pendingMember);
+                                                       
pendingMembersFailureCount++;
+                                                       if 
(pendingMembersFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD) {
+                                                               
setExpiryTime(expiryTime * 2);// Doubles the
+                                                                               
                                                // expiry time
+                                                                               
                                                // after the
+                                                                               
                                                // threshold of
+                                                                               
                                                // failure
+                                                                               
                                                // exceeded
+                                                               // TODO 
Implement an alerting system:
+                                                               // STRATOS-369
+                                                       }
+                                               }
+                                       }
+                               }
+
+                               try {
+                                       // TODO find a constant
+                                       Thread.sleep(15000);
+                               } catch (InterruptedException ignore) {
+                               }
+                       }
+               }
+
+       }
+
+       private class ObsoletedMemberWatcher implements Runnable {
+               private KubernetesClusterContext ctxt;
+
+               public ObsoletedMemberWatcher(KubernetesClusterContext ctxt) {
+                       this.ctxt = ctxt;
+               }
+
+               @Override
+               public void run() {
+                       while (true) {
+
+                               long obsoltedMemberExpiryTime = ctxt
+                                               .getObsoltedMemberExpiryTime();
+                               Map<String, MemberContext> obsoletedMembers = 
ctxt
+                                               .getObsoletedMembers();
+                               Iterator<Entry<String, MemberContext>> iterator 
= obsoletedMembers
+                                               .entrySet().iterator();
+
+                               while (iterator.hasNext()) {
+                                       Map.Entry<String, MemberContext> pairs 
= iterator.next();
+                                       MemberContext obsoleteMember = 
(MemberContext) pairs
+                                                       .getValue();
+                                       if (obsoleteMember == null) {
+                                               continue;
+                                       }
+                                       long obsoleteTime = 
System.currentTimeMillis()
+                                                       - 
obsoleteMember.getInitTime();
+                                       if (obsoleteTime >= 
obsoltedMemberExpiryTime) {
+                                               iterator.remove();
+                                       }
+                               }
+                               try {
+                                       // TODO find a constant
+                                       Thread.sleep(15000);
+                               } catch (InterruptedException ignore) {
+                               }
+                       }
+               }
+       }
+  
+       public float getAverageRequestsInFlight() {
+               return requestsInFlight.getAverage();
+       }
+
+       public void setAverageRequestsInFlight(float averageRequestsInFlight) {
+               requestsInFlight.setAverage(averageRequestsInFlight);
+               averageRifReset = true;
+               if (secondDerivativeRifRest && gradientRifReset) {
+                       rifReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Requests in flights 
stats are reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public float getRequestsInFlightSecondDerivative() {
+               return requestsInFlight.getSecondDerivative();
+       }
+
+       public void setRequestsInFlightSecondDerivative(
+                       float requestsInFlightSecondDerivative) {
+               
requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
+               secondDerivativeRifRest = true;
+               if (averageRifReset && gradientRifReset) {
+                       rifReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Requests in flights 
stats are reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public float getRequestsInFlightGradient() {
+               return requestsInFlight.getGradient();
+       }
+
+       public void setRequestsInFlightGradient(float requestsInFlightGradient) 
{
+               requestsInFlight.setGradient(requestsInFlightGradient);
+               gradientRifReset = true;
+               if (secondDerivativeRifRest && averageRifReset) {
+                       rifReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Requests in flights 
stats are reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public boolean isRifReset() {
+               return rifReset;
+       }
+
+       public void setRifReset(boolean rifReset) {
+               this.rifReset = rifReset;
+               this.averageRifReset = rifReset;
+               this.gradientRifReset = rifReset;
+               this.secondDerivativeRifRest = rifReset;
+       }
+
+       public float getAverageMemoryConsumption() {
+               return memoryConsumption.getAverage();
+       }
+
+       public void setAverageMemoryConsumption(float averageMemoryConsumption) 
{
+               memoryConsumption.setAverage(averageMemoryConsumption);
+               averageMemoryConsumptionReset = true;
+               if (secondDerivativeMemoryConsumptionRest
+                               && gradientMemoryConsumptionReset) {
+                       memoryConsumptionReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Memory consumption 
stats are reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public float getMemoryConsumptionSecondDerivative() {
+               return memoryConsumption.getSecondDerivative();
+       }
+
+       public void setMemoryConsumptionSecondDerivative(
+                       float memoryConsumptionSecondDerivative) {
+               memoryConsumption
+                               
.setSecondDerivative(memoryConsumptionSecondDerivative);
+               secondDerivativeMemoryConsumptionRest = true;
+               if (averageMemoryConsumptionReset && 
gradientMemoryConsumptionReset) {
+                       memoryConsumptionReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Memory consumption 
stats are reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public float getMemoryConsumptionGradient() {
+               return memoryConsumption.getGradient();
+       }
+
+       public void setMemoryConsumptionGradient(float 
memoryConsumptionGradient) {
+               memoryConsumption.setGradient(memoryConsumptionGradient);
+               gradientMemoryConsumptionReset = true;
+               if (secondDerivativeMemoryConsumptionRest
+                               && averageMemoryConsumptionReset) {
+                       memoryConsumptionReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Memory consumption 
stats are reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public boolean isMemoryConsumptionReset() {
+               return memoryConsumptionReset;
+       }
+
+       public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
+               this.memoryConsumptionReset = memoryConsumptionReset;
+               this.averageMemoryConsumptionReset = memoryConsumptionReset;
+               this.gradientMemoryConsumptionReset = memoryConsumptionReset;
+               this.secondDerivativeMemoryConsumptionRest = 
memoryConsumptionReset;
+       }
+
+
+       public float getAverageLoadAverage() {
+               return loadAverage.getAverage();
+       }
+
+       public void setAverageLoadAverage(float averageLoadAverage) {
+               loadAverage.setAverage(averageLoadAverage);
+               averageLoadAverageReset = true;
+               if (secondDerivativeLoadAverageRest && 
gradientLoadAverageReset) {
+                       loadAverageReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Load average stats are 
reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public float getLoadAverageSecondDerivative() {
+               return loadAverage.getSecondDerivative();
+       }
+
+       public void setLoadAverageSecondDerivative(float 
loadAverageSecondDerivative) {
+               loadAverage.setSecondDerivative(loadAverageSecondDerivative);
+               secondDerivativeLoadAverageRest = true;
+               if (averageLoadAverageReset && gradientLoadAverageReset) {
+                       loadAverageReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Load average stats are 
reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public float getLoadAverageGradient() {
+               return loadAverage.getGradient();
+       }
+
+       public void setLoadAverageGradient(float loadAverageGradient) {
+               loadAverage.setGradient(loadAverageGradient);
+               gradientLoadAverageReset = true;
+               if (secondDerivativeLoadAverageRest && averageLoadAverageReset) 
{
+                       loadAverageReset = true;
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                               .format("Load average stats are 
reset, ready to do scale check [kub cluster] %s",
+                                                               
this.kubernetesClusterId));
+                       }
+               }
+       }
+
+       public boolean isLoadAverageReset() {
+               return loadAverageReset;
+       }
+
+       public void setLoadAverageReset(boolean loadAverageReset) {
+               this.loadAverageReset = loadAverageReset;
+               this.averageLoadAverageReset = loadAverageReset;
+               this.gradientLoadAverageReset = loadAverageReset;
+               this.secondDerivativeLoadAverageRest = loadAverageReset;
        }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
index 9877fe8..ac8b61a 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
@@ -45,20 +45,11 @@ public class MemberStatsContext {
         this.memberId = memberId;
     }
 
-
     public LoadAverage getLoadAverage() {
         return loadAverage;
     }
-//
-//    public void setLoadAverage(LoadAverage loadAverage) {
-//        this.loadAverage = loadAverage;
-//    }
 
     public MemoryConsumption getMemoryConsumption() {
         return memoryConsumption;
     }
-//
-//    public void setMemoryConsumption(MemoryConsumption memoryConsumption) {
-//        this.memoryConsumption = memoryConsumption;
-//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
index 69c9f5e..3f900b6 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
@@ -276,5 +276,31 @@ public class CloudControllerClient {
             throw new SpawningException(e.getMessage(), e);
         }
     }
+    
+    public synchronized void terminateContainer(String memberId) throws 
TerminationException {
+        try {
+            if(log.isInfoEnabled()) {
+                log.info(String.format("Terminating container via cloud 
controller: [member] %s", memberId));
+            }
+            long startTime = System.currentTimeMillis();
+            stub.terminateInstance(memberId);
+            if(log.isDebugEnabled()) {
+                long endTime = System.currentTimeMillis();
+                log.debug(String.format("Service call terminateContainer() 
returned in %dms", (endTime - startTime)));
+            }
+        } catch (RemoteException e) {
+               String msg = e.getMessage();
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+        } catch (CloudControllerServiceInvalidMemberExceptionException e) {
+               String msg = 
e.getFaultMessage().getInvalidMemberException().getMessage();
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+        } catch (CloudControllerServiceInvalidCartridgeTypeExceptionException 
e) {
+               String msg = 
e.getFaultMessage().getInvalidCartridgeTypeException().getMessage();
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index 52952c3..2efcef0 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -21,14 +21,19 @@ package 
org.apache.stratos.autoscaler.message.receiver.health;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
 import org.apache.stratos.autoscaler.MemberStatsContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
 import org.apache.stratos.autoscaler.PartitionContext;
 import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
 import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
+import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.DockerServiceClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
 import org.apache.stratos.autoscaler.policy.model.LoadAverage;
 import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
+import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
@@ -94,30 +99,42 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setAverageLoadAverage(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setAverageLoadAverage(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setAverageLoadAverage(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
-
             }
 
         });
@@ -136,32 +153,43 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setAverageMemoryConsumption(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setAverageMemoryConsumption(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setAverageMemoryConsumption(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
             }
-
         });
         healthStatEventReceiver.addEventListener(new 
AverageRequestsInFlightEventListener() {
             @Override
@@ -178,28 +206,41 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setAverageRequestsInFlight(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster) {
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setAverageRequestsInFlight(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setAverageRequestsInFlight(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
             }
 
@@ -218,28 +259,41 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setLoadAverageGradient(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster){
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setLoadAverageGradient(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setLoadAverageGradient(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
             }
 
@@ -259,28 +313,41 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
-                };
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setMemoryConsumptionGradient(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                }
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster){
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setMemoryConsumptionGradient(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setMemoryConsumptionGradient(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
             }
 
@@ -299,28 +366,41 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setRequestsInFlightGradient(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster){
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setRequestsInFlightGradient(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setRequestsInFlightGradient(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
             }
 
@@ -461,28 +541,41 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster){
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setLoadAverageSecondDerivative(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
             }
 
@@ -502,30 +595,42 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster){
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setMemoryConsumptionSecondDerivative(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
-
             }
 
         });
@@ -542,31 +647,43 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                             clusterId, networkPartitionId, floatValue));
                 }
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
+                AbstractClusterMonitor monitor;
 
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
+                if(asCtx.clusterMonitorExist(clusterId)){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                } else {
                     if(log.isDebugEnabled()){
                         log.debug(String.format("A cluster monitor is not 
found in autoscaler context [cluster] %s", clusterId));
                     }
                     return;
                 }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        
networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context 
is not available for :" +
-                                   " [network partition] %s", 
networkPartitionId));
+                
+                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                               || monitor.getClusterType() == 
ClusterType.VMLbCluster){
+                       
+                    if(null != monitor){
+                        NetworkPartitionContext networkPartitionContext = 
((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
+                        if(null != networkPartitionContext){
+                            
networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
+                        } else {
+                            if(log.isDebugEnabled()) {
+                               log.debug(String.format("Network partition 
context is not available for :" +
+                                       " [network partition] %s", 
networkPartitionId));
+                            }
                         }
                     }
+                } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                       KubernetesClusterContext kubernetesClusterContext = 
((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
+                       if (null != kubernetesClusterContext) {
+                                               
kubernetesClusterContext.setRequestsInFlightSecondDerivative(floatValue);
+                                       } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Kubernetes cluster 
context is not available for :" +
+                                    " [cluster] %s", clusterId));
+                         }
+                                       }
                 }
             }
-
         });
     }
 
@@ -584,38 +701,61 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
         String clusterId = member.getClusterId();
 
         AutoscalerContext asCtx = AutoscalerContext.getInstance();
-        AbstractMonitor monitor;
+        AbstractClusterMonitor monitor;
 
-        if(asCtx.monitorExist(clusterId)){
-            monitor = asCtx.getMonitor(clusterId);
-        }else if(asCtx.lbMonitorExist(clusterId)){
-            monitor = asCtx.getLBMonitor(clusterId);
-        }else{
+        if(asCtx.clusterMonitorExist(clusterId)){
+            monitor = asCtx.getClusterMonitor(clusterId);
+        } else {
             if(log.isDebugEnabled()){
                 log.debug(String.format("A cluster monitor is not found in 
autoscaler context [cluster] %s", clusterId));
             }
             return null;
         }
-        String networkPartitionId = findNetworkPartitionId(memberId);
-        MemberStatsContext memberStatsContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId)
-                        .getPartitionCtxt(member.getPartitionId())
-                        .getMemberStatsContext(memberId);
-        if(null == memberStatsContext){
-            if(log.isDebugEnabled()) {
-               log.debug(String.format("Member context is not available for : 
[member] %s", memberId));
+        
+        if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                       || monitor.getClusterType() == ClusterType.VMLbCluster){
+               
+            String networkPartitionId = findNetworkPartitionId(memberId);
+            MemberStatsContext memberStatsContext = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId)
+                            .getPartitionCtxt(member.getPartitionId())
+                            .getMemberStatsContext(memberId);
+            if(null == memberStatsContext){
+                if(log.isDebugEnabled()) {
+                   log.debug(String.format("Member context is not available 
for : [member] %s", memberId));
+                }
+                return null;
             }
-            return null;
-        }
-        else if(!member.isActive()){
-            if(log.isDebugEnabled()){
-                log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
-                        " the health stat", memberId));
+            else if(!member.isActive()){
+                if(log.isDebugEnabled()){
+                    log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
+                            " the health stat", memberId));
+                }
+                return null;
             }
-            return null;
+
+            LoadAverage loadAverage = memberStatsContext.getLoadAverage();
+            return loadAverage;
+        } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+               MemberStatsContext memberStatsContext = 
((ContainerClusterMonitor) 
monitor).getKubernetesClusterCtxt().getMemberStatsContext(memberId);
+            if(null == memberStatsContext){
+                if(log.isDebugEnabled()) {
+                   log.debug(String.format("Member context is not available 
for : [member] %s", memberId));
+                }
+                return null;
+            }
+            else if(!member.isActive()){
+                if(log.isDebugEnabled()){
+                    log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
+                            " the health stat", memberId));
+                }
+                return null;
+            }
+
+            LoadAverage loadAverage = memberStatsContext.getLoadAverage();
+            return loadAverage;
         }
 
-        LoadAverage loadAverage = memberStatsContext.getLoadAverage();
-        return loadAverage;
+        return null;
     }
 
     private MemoryConsumption findMemoryConsumption(String memberId) {
@@ -628,37 +768,61 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
             }
                return null;
         }
-        AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(member.getClusterId());
-        if(null == monitor){
+        
+        AutoscalerContext asCtx = AutoscalerContext.getInstance();
+        AbstractClusterMonitor monitor;
 
-            monitor = 
AutoscalerContext.getInstance().getLBMonitor(member.getClusterId());
-            if(null == monitor){
-                if(log.isDebugEnabled()) {
-                   log.debug(String.format("Cluster monitor is not available 
for : [member] %s", memberId));
-                }
+        if(asCtx.clusterMonitorExist(member.getClusterId())){
+            monitor = asCtx.getClusterMonitor(member.getClusterId());
+        } else {
+            if(log.isDebugEnabled()){
+                log.debug(String.format("A cluster monitor is not found in 
autoscaler context [cluster] %s", member.getClusterId()));
             }
             return null;
         }
 
-        String networkPartitionId = findNetworkPartitionId(memberId);
-        MemberStatsContext memberStatsContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId)
-                        .getPartitionCtxt(member.getPartitionId())
-                        .getMemberStatsContext(memberId);
-        if(null == memberStatsContext){
-            if(log.isDebugEnabled()) {
-               log.debug(String.format("Member context is not available for : 
[member] %s", memberId));
+        if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                       || monitor.getClusterType() == ClusterType.VMLbCluster){
+               
+            String networkPartitionId = findNetworkPartitionId(memberId);
+            MemberStatsContext memberStatsContext = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(networkPartitionId)
+                            .getPartitionCtxt(member.getPartitionId())
+                            .getMemberStatsContext(memberId);
+            if(null == memberStatsContext){
+                if(log.isDebugEnabled()) {
+                   log.debug(String.format("Member context is not available 
for : [member] %s", memberId));
+                }
+                return null;
+            }else if(!member.isActive()){
+                if(log.isDebugEnabled()){
+                    log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
+                            " the health stat", memberId));
+                }
+                return null;
             }
-            return null;
-        }else if(!member.isActive()){
-            if(log.isDebugEnabled()){
-                log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
-                        " the health stat", memberId));
+            MemoryConsumption memoryConsumption = 
memberStatsContext.getMemoryConsumption();
+
+            return memoryConsumption;
+        } else if (monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+               MemberStatsContext memberStatsContext = 
((ContainerClusterMonitor) 
monitor).getKubernetesClusterCtxt().getMemberStatsContext(memberId);
+            if(null == memberStatsContext){
+                if(log.isDebugEnabled()) {
+                   log.debug(String.format("Member context is not available 
for : [member] %s", memberId));
+                }
+                return null;
+            }else if(!member.isActive()){
+                if(log.isDebugEnabled()){
+                    log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
+                            " the health stat", memberId));
+                }
+                return null;
             }
-            return null;
-        }
-        MemoryConsumption memoryConsumption = 
memberStatsContext.getMemoryConsumption();
+            MemoryConsumption memoryConsumption = 
memberStatsContext.getMemoryConsumption();
 
-        return memoryConsumption;
+            return memoryConsumption;
+        }
+        
+        return null;
     }
 
     private String findNetworkPartitionId(String memberId) {
@@ -692,63 +856,100 @@ public class AutoscalerHealthStatEventReceiver 
implements Runnable {
     private void handleMemberFaultEvent(String clusterId, String memberId) {
         try {
                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-               AbstractMonitor monitor;
+               AbstractClusterMonitor monitor;
                
-               if(asCtx.monitorExist(clusterId)){
-                       monitor = asCtx.getMonitor(clusterId);
-               }else if(asCtx.lbMonitorExist(clusterId)){
-                       monitor = asCtx.getLBMonitor(clusterId);
-               }else{
+            if(asCtx.clusterMonitorExist(clusterId)){
+                monitor = asCtx.getClusterMonitor(clusterId);
+            } else {
                 if(log.isDebugEnabled()){
                     log.debug(String.format("A cluster monitor is not found in 
autoscaler context [cluster] %s", clusterId));
                 }
                 return;
-               }
-               
-               NetworkPartitionContext nwPartitionCtxt;
-            try{
-               TopologyManager.acquireReadLock();
-               Member member = findMember(memberId);
+            }
+
+            if(monitor.getClusterType() == ClusterType.VMServiceCluster 
+                       || monitor.getClusterType() == ClusterType.VMLbCluster){
                
-               if(null == member){
-                       return;
-               }
-                if(!member.isActive()){
+               NetworkPartitionContext nwPartitionCtxt;
+                try{
+                       TopologyManager.acquireReadLock();
+                       Member member = findMember(memberId);
+                       
+                       if(null == member){
+                               return;
+                       }
+                    if(!member.isActive()){
+                        if(log.isDebugEnabled()){
+                            log.debug(String.format("Member activated event 
has not received for the member %s. Therefore ignoring" +
+                                    " the member fault health stat", 
memberId));
+                        }
+                        return;
+                    }
+                   
+                   nwPartitionCtxt = ((VMClusterMonitor) 
monitor).getNetworkPartitionCtxt(member);
+                   
+                }finally{
+                       TopologyManager.releaseReadLock();
+                }
+                // start a new member in the same Partition
+                String partitionId = ((VMClusterMonitor) 
monitor).getPartitionOfMember(memberId);
+                PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+                if(!partitionCtxt.activeMemberExist(memberId)){
                     if(log.isDebugEnabled()){
-                        log.debug(String.format("Member activated event has 
not received for the member %s. Therefore ignoring" +
-                                " the member fault health stat", memberId));
+                        log.debug(String.format("Could not find the active 
member in partition context, [member] %s ", memberId));
                     }
                     return;
                 }
-                   
-                   nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
-                   
-            }finally{
-               TopologyManager.releaseReadLock();
-            }
-            // start a new member in the same Partition
-            String partitionId = monitor.getPartitionOfMember(memberId);
-            PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-            if(!partitionCtxt.activeMemberExist(memberId)){
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("Could not find the active member 
in partition context, [member] %s ", memberId));
+                // terminate the faulty member
+                CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
+                ccClient.terminate(memberId);
+
+                // remove from active member list
+                partitionCtxt.removeActiveMemberById(memberId);
+
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Faulty member is terminated and 
removed from the active members list: [member] %s [partition] %s [cluster] %s ",
+                                           memberId, partitionId, clusterId));
+                }
+            } else if(monitor.getClusterType() == 
ClusterType.DockerServiceCluster) {
+                try{
+                       TopologyManager.acquireReadLock();
+                       Member member = findMember(memberId);
+                       
+                       if(null == member){
+                               return;
+                       }
+                    if(!member.isActive()){
+                        if(log.isDebugEnabled()){
+                            log.debug(String.format("Member activated event 
has not received for the member %s. Therefore ignoring" +
+                                    " the member fault health stat", 
memberId));
+                        }
+                        return;
+                    }
+                }finally{
+                       TopologyManager.releaseReadLock();
                 }
-                return;
-            }
-            // terminate the faulty member
-            CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
-            ccClient.terminate(memberId);
-
-            // remove from active member list
-            partitionCtxt.removeActiveMemberById(memberId);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Faulty member is terminated and 
removed from the active members list: [member] %s [partition] %s [cluster] %s ",
-                                       memberId, partitionId, clusterId));
+                
+                KubernetesClusterContext kubernetesClusterContext = 
((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
+                if (!kubernetesClusterContext.activeMemberExist(memberId)) {
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("Could not find the active 
member in partition context, [member] %s ", memberId));
+                    }
+                    return;
+                               }
+                //terminate the faulty member
+                
CloudControllerClient.getInstance().terminateContainer(memberId);
+                //remove from active member list
+                kubernetesClusterContext.removeActiveMemberById(memberId);
+                
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Faulty member is terminated and 
removed from the active members list: [member] %s [kub cluster] %s [cluster] %s 
",
+                                           memberId, 
kubernetesClusterContext.getKubernetesClusterID(), clusterId));
+                }
+                
             }
-
-
+            
         } catch (TerminationException e) {
             log.error(e);
         }

Reply via email to