cluster monitors hierarchy redesigned and docker cluster monitor improved
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d6f49d37 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d6f49d37 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d6f49d37 Branch: refs/heads/docker-integration Commit: d6f49d37095d0c6d20cf799f6e9a0e47932608ca Parents: 9f374fe Author: R-Rajkumar <[email protected]> Authored: Mon Sep 22 13:38:12 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Tue Sep 23 14:48:33 2014 +0530 ---------------------------------------------------------------------- .../stratos/autoscaler/AutoscalerContext.java | 106 +-- .../autoscaler/KubernetesClusterContext.java | 639 +++++++++++++++++- .../stratos/autoscaler/MemberStatsContext.java | 9 - .../cloud/controller/CloudControllerClient.java | 26 + .../AutoscalerHealthStatEventReceiver.java | 647 ++++++++++++------- .../AutoscalerTopologyEventReceiver.java | 484 +++++++------- .../monitor/AbstractClusterMonitor.java | 127 ++++ .../autoscaler/monitor/AbstractMonitor.java | 203 ------ .../autoscaler/monitor/ClusterMonitor.java | 223 ------- .../monitor/ClusterMonitorFactory.java | 336 ++++++++++ .../monitor/ContainerClusterMonitor.java | 38 ++ .../monitor/DockerServiceClusterMonitor.java | 156 +++++ .../monitor/KubernetesClusterMonitor.java | 186 ------ .../autoscaler/monitor/LbClusterMonitor.java | 126 ---- .../autoscaler/monitor/VMClusterMonitor.java | 120 ++++ .../autoscaler/monitor/VMLbClusterMonitor.java | 135 ++++ .../monitor/VMServiceClusterMonitor.java | 231 +++++++ .../stratos/autoscaler/util/AutoscalerUtil.java | 574 ++++++++-------- .../stratos/common/enums/ClusterType.java | 5 + 19 files changed, 2784 insertions(+), 1587 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java index e3eb598..18003d8 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java @@ -20,15 +20,13 @@ */ package org.apache.stratos.autoscaler; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.monitor.ClusterMonitor; -import org.apache.stratos.autoscaler.monitor.KubernetesClusterMonitor; -import org.apache.stratos.autoscaler.monitor.LbClusterMonitor; - import java.util.HashMap; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; + /** * This class is there for accumulating cluster details which are not there in Topology */ @@ -37,20 +35,14 @@ public class AutoscalerContext { private static final Log log = LogFactory.getLog(AutoscalerContext.class); private AutoscalerContext() { try { - setMonitors(new HashMap<String, ClusterMonitor>()); - setLbMonitors(new HashMap<String, LbClusterMonitor>()); - setKubernetesClusterMonitors(new HashMap<String, KubernetesClusterMonitor>()); + setClusterMonitors(new HashMap<String, AbstractClusterMonitor>()); } catch (Exception e) { log.error("Rule evaluateMinCheck error", e); } } - // Map<ClusterId, ClusterMonitor> - private Map<String, ClusterMonitor> monitors; - // Map<LBClusterId, LBClusterMonitor> - private Map<String, LbClusterMonitor> lbMonitors; - // Map<ClusterId, KubernetesClusterMonitor> - private Map<String, KubernetesClusterMonitor> kubernetesClusterMonitors; + // Map<ClusterId, AbstractClusterMonitor> + private Map<String, AbstractClusterMonitor> clusterMonitors; private static class Holder { private static final AutoscalerContext INSTANCE = new AutoscalerContext(); @@ -60,86 +52,32 @@ public class AutoscalerContext { return Holder.INSTANCE; } - public void addMonitor(ClusterMonitor monitor) { - monitors.put(monitor.getClusterId(), monitor); - } - - public ClusterMonitor getMonitor(String clusterId) { - return monitors.get(clusterId); - } - - public boolean monitorExist(String clusterId) { - return monitors.containsKey(clusterId); - } - - public boolean lbMonitorExist(String clusterId) { - return lbMonitors.containsKey(clusterId); - } - - public LbClusterMonitor getLBMonitor(String clusterId) { - return lbMonitors.get(clusterId); - } - - public ClusterMonitor removeMonitor(String clusterId) { - if(!monitorExist(clusterId)) { - log.fatal("Cluster monitor not found for cluster id: "+clusterId); - return null; - } - log.info("Removed monitor [cluster id]: " + clusterId); - return monitors.remove(clusterId); - } - public LbClusterMonitor removeLbMonitor(String clusterId) { - if(!lbMonitorExist(clusterId)) { - log.fatal("LB monitor not found for cluster id: "+clusterId); - return null; - } - log.info("Removed LB monitor [cluster id]: " + clusterId); - return lbMonitors.remove(clusterId); - } - - public Map<String, ClusterMonitor> getMonitors() { - return monitors; - } - - - public void setMonitors(Map<String, ClusterMonitor> monitors) { - this.monitors = monitors; - } - - public void setLbMonitors(Map<String, LbClusterMonitor> monitors) { - this.lbMonitors = monitors; - } - - public void addLbMonitor(LbClusterMonitor monitor) { - lbMonitors.put(monitor.getClusterId(), monitor); - } - - public void addKubernetesClusterMonitor(KubernetesClusterMonitor kubernetesClusterMonitor) { - kubernetesClusterMonitors.put(kubernetesClusterMonitor.getClusterId(), kubernetesClusterMonitor); + public void addClusterMonitor(AbstractClusterMonitor clusterMonitor) { + clusterMonitors.put(clusterMonitor.getClusterId(), clusterMonitor); } - public KubernetesClusterMonitor getKubernetesClusterMonitor(String clusterId) { - return kubernetesClusterMonitors.get(clusterId); + public AbstractClusterMonitor getClusterMonitor(String clusterId) { + return clusterMonitors.get(clusterId); } - public boolean kubernetesClusterMonitorExist(String clusterId) { - return kubernetesClusterMonitors.containsKey(clusterId); + public boolean clusterMonitorExist(String clusterId) { + return clusterMonitors.containsKey(clusterId); } - public Map<String, KubernetesClusterMonitor> getKubernetesClusterMonitors() { - return kubernetesClusterMonitors; + public Map<String, AbstractClusterMonitor> getClusterMonitors() { + return clusterMonitors; } - public void setKubernetesClusterMonitors(Map<String, KubernetesClusterMonitor> kubernetesClusterMonitors) { - this.kubernetesClusterMonitors = kubernetesClusterMonitors; + public void setClusterMonitors(Map<String, AbstractClusterMonitor> clusterMonitors) { + this.clusterMonitors = clusterMonitors; } - public KubernetesClusterMonitor removeKubernetesClusterMonitor(String clusterId) { - if(!kubernetesClusterMonitorExist(clusterId)) { - log.fatal("Kubernetes cluster monitor not found for cluster id: "+clusterId); + public AbstractClusterMonitor removeClusterMonitor(String clusterId) { + if(!clusterMonitorExist(clusterId)) { + log.fatal("ClusterMonitor not found for cluster id: "+clusterId); return null; } - log.info("Removed KubernetesClusterMonitor [cluster id]: " + clusterId); - return kubernetesClusterMonitors.remove(clusterId); + log.info("Removed ClusterMonitor [cluster id]: " + clusterId); + return clusterMonitors.remove(clusterId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java index f369ac9..e1d5e30 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java @@ -1,20 +1,647 @@ package org.apache.stratos.autoscaler; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.policy.model.LoadAverage; +import org.apache.stratos.autoscaler.policy.model.MemoryConsumption; +import org.apache.stratos.autoscaler.policy.model.RequestsInFlight; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; public class KubernetesClusterContext implements Serializable{ private static final long serialVersionUID = 808741789615481596L; - String kubernetesClusterID; + private static final Log log = LogFactory.getLog(KubernetesClusterContext.class); + + private String kubernetesClusterId; + private String serviceName; + + private int minReplicas; + private int maxReplicas; + private int currentReplicas = 0; + + // properties + private Properties properties; + + // 15 mints as the default + private long expiryTime = 900000; + // pending members + private List<MemberContext> pendingMembers; + private int pendingMembersFailureCount = 0; + private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5; + + // 1 day as default + private long obsoltedMemberExpiryTime = 1*24*60*60*1000; + + // members to be terminated + private Map<String, MemberContext> obsoletedMembers; + + // active members + private List<MemberContext> activeMembers; + + // termination pending members, member is added to this when Autoscaler send grace fully shut down event + private List<MemberContext> terminationPendingMembers; + + //Keep statistics come from CEP + private Map<String, MemberStatsContext> memberStatsContexts; - public KubernetesClusterContext(String kubernetesClusterID){ - this.kubernetesClusterID = kubernetesClusterID; + //Following information will keep events details + private RequestsInFlight requestsInFlight; + private MemoryConsumption memoryConsumption; + private LoadAverage loadAverage; + + //boolean values to keep whether the requests in flight parameters are reset or not + private boolean rifReset = false, averageRifReset = false, gradientRifReset = false, secondDerivativeRifRest = false; + //boolean values to keep whether the memory consumption parameters are reset or not + private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false, + gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false; + //boolean values to keep whether the load average parameters are reset or not + private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, + secondDerivativeLoadAverageRest = false; + + public KubernetesClusterContext(String kubernetesClusterId){ + this.kubernetesClusterId = kubernetesClusterId; + this.pendingMembers = new ArrayList<MemberContext>(); + this.activeMembers = new ArrayList<MemberContext>(); + this.terminationPendingMembers = new ArrayList<MemberContext>(); + this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>(); + this.memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>(); + this.requestsInFlight = new RequestsInFlight(); + this.loadAverage = new LoadAverage(); + this.memoryConsumption = new MemoryConsumption(); + + // check if a different value has been set for expiryTime + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + expiryTime = conf.getLong("autoscaler.member.expiryTimeout", 900000); + if (log.isDebugEnabled()) { + log.debug("Member expiry time is set to: " + expiryTime); + } + + Thread th = new Thread(new PendingMemberWatcher(this)); + th.start(); + Thread th2 = new Thread(new ObsoletedMemberWatcher(this)); + th2.start(); } public String getKubernetesClusterID() { - return kubernetesClusterID; + return kubernetesClusterId; + } + public void setKubernetesClusterID(String kubernetesClusterId) { + this.kubernetesClusterId = kubernetesClusterId; + } + + public List<MemberContext> getPendingMembers() { + return pendingMembers; + } + + public void setPendingMembers(List<MemberContext> pendingMembers) { + this.pendingMembers = pendingMembers; + } + + public int getActiveMemberCount() { + return activeMembers.size(); + } + + public void setActiveMembers(List<MemberContext> activeMembers) { + this.activeMembers = activeMembers; + } + + public int getMinReplicas() { + return minReplicas; + } + + public void setMinReplicas(int minReplicas) { + this.minReplicas = minReplicas; + } + + public int getMaxReplicas() { + return maxReplicas; + } + + public void setMaxReplicas(int maxReplicas) { + this.maxReplicas = maxReplicas; + } + + public int getCurrentReplicas() { + return currentReplicas; + } + + public void setCurrentReplicas(int currentReplicas) { + this.currentReplicas = currentReplicas; + } + + public void addPendingMember(MemberContext ctxt) { + this.pendingMembers.add(ctxt); + } + + public boolean removePendingMember(String id) { + if (id == null) { + return false; + } + for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext();) { + MemberContext pendingMember = (MemberContext) iterator.next(); + if (id.equals(pendingMember.getMemberId())) { + iterator.remove(); + return true; + } + + } + + return false; + } + + public void movePendingMemberToActiveMembers(String memberId) { + if (memberId == null) { + return; + } + Iterator<MemberContext> iterator = pendingMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + if (pendingMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(pendingMember.getMemberId())) { + // member is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.activeMembers.add(pendingMember); + pendingMembersFailureCount = 0; + if (log.isDebugEnabled()) { + log.debug(String.format( + "Pending member is removed and added to the " + + "activated member list. [Member Id] %s", + memberId)); + } + break; + } + } + } + + public void moveActiveMemberToTerminationPendingMembers(String memberId) { + if (memberId == null) { + return; + } + Iterator<MemberContext> iterator = activeMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext activeMember = iterator.next(); + if (activeMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(activeMember.getMemberId())) { + // member is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.terminationPendingMembers.add(activeMember); + if (log.isDebugEnabled()) { + log.debug(String + .format("Active member is removed and added to the " + + "termination pending member list. [Member Id] %s", + memberId)); + } + break; + } + } + } + + public void addActiveMember(MemberContext ctxt) { + this.activeMembers.add(ctxt); + } + + public void removeActiveMember(MemberContext ctxt) { + this.activeMembers.remove(ctxt); + } + + public boolean removeTerminationPendingMember(String memberId) { + boolean terminationPendingMemberAvailable = false; + for (MemberContext memberContext : terminationPendingMembers) { + if (memberContext.getMemberId().equals(memberId)) { + terminationPendingMemberAvailable = true; + terminationPendingMembers.remove(memberContext); + break; + } + } + return terminationPendingMemberAvailable; + } + + public long getObsoltedMemberExpiryTime() { + return obsoltedMemberExpiryTime; + } + + public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) { + this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime; + } + + public void addObsoleteMember(MemberContext ctxt) { + this.obsoletedMembers.put(ctxt.getMemberId(), ctxt); + } + + public boolean removeObsoleteMember(String memberId) { + if (this.obsoletedMembers.remove(memberId) == null) { + return false; + } + return true; + } + + public long getExpiryTime() { + return expiryTime; + } + + public void setExpiryTime(long expiryTime) { + this.expiryTime = expiryTime; + } + + public Map<String, MemberContext> getObsoletedMembers() { + return obsoletedMembers; + } + + public void setObsoletedMembers(Map<String, MemberContext> obsoletedMembers) { + this.obsoletedMembers = obsoletedMembers; + } + + public Map<String, MemberStatsContext> getMemberStatsContexts() { + return memberStatsContexts; + } + + public MemberStatsContext getMemberStatsContext(String memberId) { + return memberStatsContexts.get(memberId); + } + + public void addMemberStatsContext(MemberStatsContext ctxt) { + this.memberStatsContexts.put(ctxt.getMemberId(), ctxt); + } + + public void removeMemberStatsContext(String memberId) { + this.memberStatsContexts.remove(memberId); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public List<MemberContext> getTerminationPendingMembers() { + return terminationPendingMembers; + } + + public void setTerminationPendingMembers( + List<MemberContext> terminationPendingMembers) { + this.terminationPendingMembers = terminationPendingMembers; } - public void setKubernetesClusterID(String kubernetesClusterID) { - this.kubernetesClusterID = kubernetesClusterID; + + public int getTotalMemberCount() { + return activeMembers.size() + pendingMembers.size() + + terminationPendingMembers.size(); + } + + public int getNonTerminatedMemberCount() { + return activeMembers.size() + pendingMembers.size() + + terminationPendingMembers.size(); + } + + public List<MemberContext> getActiveMembers() { + return activeMembers; + } + + public boolean removeActiveMemberById(String memberId) { + boolean removeActiveMember = false; + synchronized (activeMembers) { + Iterator<MemberContext> iterator = activeMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext memberContext = iterator.next(); + if (memberId.equals(memberContext.getMemberId())) { + iterator.remove(); + removeActiveMember = true; + + break; + } + } + } + return removeActiveMember; + } + + public boolean activeMemberExist(String memberId) { + + for (MemberContext memberContext : activeMembers) { + if (memberId.equals(memberContext.getMemberId())) { + return true; + } + } + return false; + } + + private class PendingMemberWatcher implements Runnable { + private KubernetesClusterContext ctxt; + + public PendingMemberWatcher(KubernetesClusterContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + while (true) { + long expiryTime = ctxt.getExpiryTime(); + List<MemberContext> pendingMembers = ctxt.getPendingMembers(); + + synchronized (pendingMembers) { + Iterator<MemberContext> iterator = pendingMembers + .listIterator(); + while (iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + + if (pendingMember == null) { + continue; + } + long pendingTime = System.currentTimeMillis() + - pendingMember.getInitTime(); + if (pendingTime >= expiryTime) { + + iterator.remove(); + log.info("Pending state of member: " + + pendingMember.getMemberId() + + " is expired. " + + "Adding as an obsoleted member."); + // member should be terminated + ctxt.addObsoleteMember(pendingMember); + pendingMembersFailureCount++; + if (pendingMembersFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD) { + setExpiryTime(expiryTime * 2);// Doubles the + // expiry time + // after the + // threshold of + // failure + // exceeded + // TODO Implement an alerting system: + // STRATOS-369 + } + } + } + } + + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + + } + + private class ObsoletedMemberWatcher implements Runnable { + private KubernetesClusterContext ctxt; + + public ObsoletedMemberWatcher(KubernetesClusterContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + while (true) { + + long obsoltedMemberExpiryTime = ctxt + .getObsoltedMemberExpiryTime(); + Map<String, MemberContext> obsoletedMembers = ctxt + .getObsoletedMembers(); + Iterator<Entry<String, MemberContext>> iterator = obsoletedMembers + .entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry<String, MemberContext> pairs = iterator.next(); + MemberContext obsoleteMember = (MemberContext) pairs + .getValue(); + if (obsoleteMember == null) { + continue; + } + long obsoleteTime = System.currentTimeMillis() + - obsoleteMember.getInitTime(); + if (obsoleteTime >= obsoltedMemberExpiryTime) { + iterator.remove(); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + } + + public float getAverageRequestsInFlight() { + return requestsInFlight.getAverage(); + } + + public void setAverageRequestsInFlight(float averageRequestsInFlight) { + requestsInFlight.setAverage(averageRequestsInFlight); + averageRifReset = true; + if (secondDerivativeRifRest && gradientRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Requests in flights stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public float getRequestsInFlightSecondDerivative() { + return requestsInFlight.getSecondDerivative(); + } + + public void setRequestsInFlightSecondDerivative( + float requestsInFlightSecondDerivative) { + requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative); + secondDerivativeRifRest = true; + if (averageRifReset && gradientRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Requests in flights stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public float getRequestsInFlightGradient() { + return requestsInFlight.getGradient(); + } + + public void setRequestsInFlightGradient(float requestsInFlightGradient) { + requestsInFlight.setGradient(requestsInFlightGradient); + gradientRifReset = true; + if (secondDerivativeRifRest && averageRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Requests in flights stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public boolean isRifReset() { + return rifReset; + } + + public void setRifReset(boolean rifReset) { + this.rifReset = rifReset; + this.averageRifReset = rifReset; + this.gradientRifReset = rifReset; + this.secondDerivativeRifRest = rifReset; + } + + public float getAverageMemoryConsumption() { + return memoryConsumption.getAverage(); + } + + public void setAverageMemoryConsumption(float averageMemoryConsumption) { + memoryConsumption.setAverage(averageMemoryConsumption); + averageMemoryConsumptionReset = true; + if (secondDerivativeMemoryConsumptionRest + && gradientMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Memory consumption stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public float getMemoryConsumptionSecondDerivative() { + return memoryConsumption.getSecondDerivative(); + } + + public void setMemoryConsumptionSecondDerivative( + float memoryConsumptionSecondDerivative) { + memoryConsumption + .setSecondDerivative(memoryConsumptionSecondDerivative); + secondDerivativeMemoryConsumptionRest = true; + if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Memory consumption stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public float getMemoryConsumptionGradient() { + return memoryConsumption.getGradient(); + } + + public void setMemoryConsumptionGradient(float memoryConsumptionGradient) { + memoryConsumption.setGradient(memoryConsumptionGradient); + gradientMemoryConsumptionReset = true; + if (secondDerivativeMemoryConsumptionRest + && averageMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Memory consumption stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public boolean isMemoryConsumptionReset() { + return memoryConsumptionReset; + } + + public void setMemoryConsumptionReset(boolean memoryConsumptionReset) { + this.memoryConsumptionReset = memoryConsumptionReset; + this.averageMemoryConsumptionReset = memoryConsumptionReset; + this.gradientMemoryConsumptionReset = memoryConsumptionReset; + this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset; + } + + + public float getAverageLoadAverage() { + return loadAverage.getAverage(); + } + + public void setAverageLoadAverage(float averageLoadAverage) { + loadAverage.setAverage(averageLoadAverage); + averageLoadAverageReset = true; + if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Load average stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public float getLoadAverageSecondDerivative() { + return loadAverage.getSecondDerivative(); + } + + public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) { + loadAverage.setSecondDerivative(loadAverageSecondDerivative); + secondDerivativeLoadAverageRest = true; + if (averageLoadAverageReset && gradientLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Load average stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public float getLoadAverageGradient() { + return loadAverage.getGradient(); + } + + public void setLoadAverageGradient(float loadAverageGradient) { + loadAverage.setGradient(loadAverageGradient); + gradientLoadAverageReset = true; + if (secondDerivativeLoadAverageRest && averageLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String + .format("Load average stats are reset, ready to do scale check [kub cluster] %s", + this.kubernetesClusterId)); + } + } + } + + public boolean isLoadAverageReset() { + return loadAverageReset; + } + + public void setLoadAverageReset(boolean loadAverageReset) { + this.loadAverageReset = loadAverageReset; + this.averageLoadAverageReset = loadAverageReset; + this.gradientLoadAverageReset = loadAverageReset; + this.secondDerivativeLoadAverageRest = loadAverageReset; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java index 9877fe8..ac8b61a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java @@ -45,20 +45,11 @@ public class MemberStatsContext { this.memberId = memberId; } - public LoadAverage getLoadAverage() { return loadAverage; } -// -// public void setLoadAverage(LoadAverage loadAverage) { -// this.loadAverage = loadAverage; -// } public MemoryConsumption getMemoryConsumption() { return memoryConsumption; } -// -// public void setMemoryConsumption(MemoryConsumption memoryConsumption) { -// this.memoryConsumption = memoryConsumption; -// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java index 69c9f5e..3f900b6 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java @@ -276,5 +276,31 @@ public class CloudControllerClient { throw new SpawningException(e.getMessage(), e); } } + + public synchronized void terminateContainer(String memberId) throws TerminationException { + try { + if(log.isInfoEnabled()) { + log.info(String.format("Terminating container via cloud controller: [member] %s", memberId)); + } + long startTime = System.currentTimeMillis(); + stub.terminateInstance(memberId); + if(log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Service call terminateContainer() returned in %dms", (endTime - startTime))); + } + } catch (RemoteException e) { + String msg = e.getMessage(); + log.error(msg, e); + throw new TerminationException(msg, e); + } catch (CloudControllerServiceInvalidMemberExceptionException e) { + String msg = e.getFaultMessage().getInvalidMemberException().getMessage(); + log.error(msg, e); + throw new TerminationException(msg, e); + } catch (CloudControllerServiceInvalidCartridgeTypeExceptionException e) { + String msg = e.getFaultMessage().getInvalidCartridgeTypeException().getMessage(); + log.error(msg, e); + throw new TerminationException(msg, e); + } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java index 52952c3..2efcef0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -21,14 +21,19 @@ package org.apache.stratos.autoscaler.message.receiver.health; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.KubernetesClusterContext; import org.apache.stratos.autoscaler.MemberStatsContext; import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; import org.apache.stratos.autoscaler.exception.TerminationException; -import org.apache.stratos.autoscaler.monitor.AbstractMonitor; +import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor; +import org.apache.stratos.autoscaler.monitor.DockerServiceClusterMonitor; +import org.apache.stratos.autoscaler.monitor.VMClusterMonitor; import org.apache.stratos.autoscaler.policy.model.LoadAverage; import org.apache.stratos.autoscaler.policy.model.MemoryConsumption; +import org.apache.stratos.common.enums.ClusterType; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; @@ -94,30 +99,42 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setAverageLoadAverage(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster) { + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setAverageLoadAverage(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setAverageLoadAverage(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } - } }); @@ -136,32 +153,43 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setAverageMemoryConsumption(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster) { + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setAverageMemoryConsumption(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setAverageMemoryConsumption(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } } - }); healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() { @Override @@ -178,28 +206,41 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setAverageRequestsInFlight(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster) { + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setAverageRequestsInFlight(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setAverageRequestsInFlight(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } } @@ -218,28 +259,41 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setLoadAverageGradient(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setLoadAverageGradient(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setLoadAverageGradient(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } } @@ -259,28 +313,41 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; - }; - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setMemoryConsumptionGradient(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + } + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setMemoryConsumptionGradient(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setMemoryConsumptionGradient(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } } @@ -299,28 +366,41 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setRequestsInFlightGradient(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setRequestsInFlightGradient(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setRequestsInFlightGradient(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } } @@ -461,28 +541,41 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setLoadAverageSecondDerivative(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setLoadAverageSecondDerivative(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setLoadAverageSecondDerivative(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } } @@ -502,30 +595,42 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setMemoryConsumptionSecondDerivative(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } - } }); @@ -542,31 +647,43 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { clusterId, networkPartitionId, floatValue)); } AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue); - } else { - if(log.isDebugEnabled()) { - log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } } } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setRequestsInFlightSecondDerivative(floatValue); + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } } } - }); } @@ -584,38 +701,61 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { String clusterId = member.getClusterId(); AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return null; } - String networkPartitionId = findNetworkPartitionId(memberId); - MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId) - .getPartitionCtxt(member.getPartitionId()) - .getMemberStatsContext(memberId); - if(null == memberStatsContext){ - if(log.isDebugEnabled()) { - log.debug(String.format("Member context is not available for : [member] %s", memberId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + String networkPartitionId = findNetworkPartitionId(memberId); + MemberStatsContext memberStatsContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId) + .getPartitionCtxt(member.getPartitionId()) + .getMemberStatsContext(memberId); + if(null == memberStatsContext){ + if(log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return null; } - return null; - } - else if(!member.isActive()){ - if(log.isDebugEnabled()){ - log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + - " the health stat", memberId)); + else if(!member.isActive()){ + if(log.isDebugEnabled()){ + log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + + " the health stat", memberId)); + } + return null; } - return null; + + LoadAverage loadAverage = memberStatsContext.getLoadAverage(); + return loadAverage; + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + MemberStatsContext memberStatsContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt().getMemberStatsContext(memberId); + if(null == memberStatsContext){ + if(log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return null; + } + else if(!member.isActive()){ + if(log.isDebugEnabled()){ + log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + + " the health stat", memberId)); + } + return null; + } + + LoadAverage loadAverage = memberStatsContext.getLoadAverage(); + return loadAverage; } - LoadAverage loadAverage = memberStatsContext.getLoadAverage(); - return loadAverage; + return null; } private MemoryConsumption findMemoryConsumption(String memberId) { @@ -628,37 +768,61 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { } return null; } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId()); - if(null == monitor){ + + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; - monitor = AutoscalerContext.getInstance().getLBMonitor(member.getClusterId()); - if(null == monitor){ - if(log.isDebugEnabled()) { - log.debug(String.format("Cluster monitor is not available for : [member] %s", memberId)); - } + if(asCtx.clusterMonitorExist(member.getClusterId())){ + monitor = asCtx.getClusterMonitor(member.getClusterId()); + } else { + if(log.isDebugEnabled()){ + log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", member.getClusterId())); } return null; } - String networkPartitionId = findNetworkPartitionId(memberId); - MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId) - .getPartitionCtxt(member.getPartitionId()) - .getMemberStatsContext(memberId); - if(null == memberStatsContext){ - if(log.isDebugEnabled()) { - log.debug(String.format("Member context is not available for : [member] %s", memberId)); + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ + + String networkPartitionId = findNetworkPartitionId(memberId); + MemberStatsContext memberStatsContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId) + .getPartitionCtxt(member.getPartitionId()) + .getMemberStatsContext(memberId); + if(null == memberStatsContext){ + if(log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return null; + }else if(!member.isActive()){ + if(log.isDebugEnabled()){ + log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + + " the health stat", memberId)); + } + return null; } - return null; - }else if(!member.isActive()){ - if(log.isDebugEnabled()){ - log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + - " the health stat", memberId)); + MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption(); + + return memoryConsumption; + } else if (monitor.getClusterType() == ClusterType.DockerServiceCluster) { + MemberStatsContext memberStatsContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt().getMemberStatsContext(memberId); + if(null == memberStatsContext){ + if(log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return null; + }else if(!member.isActive()){ + if(log.isDebugEnabled()){ + log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + + " the health stat", memberId)); + } + return null; } - return null; - } - MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption(); + MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption(); - return memoryConsumption; + return memoryConsumption; + } + + return null; } private String findNetworkPartitionId(String memberId) { @@ -692,63 +856,100 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { private void handleMemberFaultEvent(String clusterId, String memberId) { try { AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)){ + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; - } - - NetworkPartitionContext nwPartitionCtxt; - try{ - TopologyManager.acquireReadLock(); - Member member = findMember(memberId); + } + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster){ - if(null == member){ - return; - } - if(!member.isActive()){ + NetworkPartitionContext nwPartitionCtxt; + try{ + TopologyManager.acquireReadLock(); + Member member = findMember(memberId); + + if(null == member){ + return; + } + if(!member.isActive()){ + if(log.isDebugEnabled()){ + log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + + " the member fault health stat", memberId)); + } + return; + } + + nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(member); + + }finally{ + TopologyManager.releaseReadLock(); + } + // start a new member in the same Partition + String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId); + PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + + if(!partitionCtxt.activeMemberExist(memberId)){ if(log.isDebugEnabled()){ - log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + - " the member fault health stat", memberId)); + log.debug(String.format("Could not find the active member in partition context, [member] %s ", memberId)); } return; } - - nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member); - - }finally{ - TopologyManager.releaseReadLock(); - } - // start a new member in the same Partition - String partitionId = monitor.getPartitionOfMember(memberId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); - - if(!partitionCtxt.activeMemberExist(memberId)){ - if(log.isDebugEnabled()){ - log.debug(String.format("Could not find the active member in partition context, [member] %s ", memberId)); + // terminate the faulty member + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + ccClient.terminate(memberId); + + // remove from active member list + partitionCtxt.removeActiveMemberById(memberId); + + if (log.isInfoEnabled()) { + log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ", + memberId, partitionId, clusterId)); + } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + try{ + TopologyManager.acquireReadLock(); + Member member = findMember(memberId); + + if(null == member){ + return; + } + if(!member.isActive()){ + if(log.isDebugEnabled()){ + log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + + " the member fault health stat", memberId)); + } + return; + } + }finally{ + TopologyManager.releaseReadLock(); } - return; - } - // terminate the faulty member - CloudControllerClient ccClient = CloudControllerClient.getInstance(); - ccClient.terminate(memberId); - - // remove from active member list - partitionCtxt.removeActiveMemberById(memberId); - - if (log.isInfoEnabled()) { - log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ", - memberId, partitionId, clusterId)); + + KubernetesClusterContext kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt(); + if (!kubernetesClusterContext.activeMemberExist(memberId)) { + if(log.isDebugEnabled()){ + log.debug(String.format("Could not find the active member in partition context, [member] %s ", memberId)); + } + return; + } + //terminate the faulty member + CloudControllerClient.getInstance().terminateContainer(memberId); + //remove from active member list + kubernetesClusterContext.removeActiveMemberById(memberId); + + if (log.isInfoEnabled()) { + log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [kub cluster] %s [cluster] %s ", + memberId, kubernetesClusterContext.getKubernetesClusterID(), clusterId)); + } + } - - + } catch (TerminationException e) { log.error(e); }
