Repository: stratos Updated Branches: refs/heads/container-autoscaling fb68de94a -> 7162325f3
adding member lists to kubernetes cluster context Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d4c31528 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d4c31528 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d4c31528 Branch: refs/heads/container-autoscaling Commit: d4c315280d3cf6fb917597a11c311d186774cd47 Parents: fb68de9 Author: R-Rajkumar <[email protected]> Authored: Tue Oct 7 10:51:56 2014 +0530 Committer: R-Rajkumar <[email protected]> Committed: Tue Oct 7 10:51:56 2014 +0530 ---------------------------------------------------------------------- .../autoscaler/KubernetesClusterContext.java | 137 ++++++++++++++++++- 1 file changed, 136 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/d4c31528/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java index c8b6e39..5704b18 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -38,7 +39,7 @@ import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; /* - * It holds the runtime data of a kubernetes cluster + * It holds the runtime data of a kubernetes service cluster */ public class KubernetesClusterContext implements Serializable { @@ -62,6 +63,15 @@ public class KubernetesClusterContext implements Serializable { // active members private List<MemberContext> activeMembers; + + // 1 day as default + private long obsoltedMemberExpiryTime = 1*24*60*60*1000; + + // members to be terminated + private Map<String, MemberContext> obsoletedMembers; + + // termination pending members, member is added to this when Autoscaler send grace fully shut down event + private List<MemberContext> terminationPendingMembers; //Keep statistics come from CEP private Map<String, MemberStatsContext> memberStatsContexts; @@ -89,6 +99,8 @@ public class KubernetesClusterContext implements Serializable { this.clusterId = clusterId; this.pendingMembers = new ArrayList<MemberContext>(); this.activeMembers = new ArrayList<MemberContext>(); + this.terminationPendingMembers = new ArrayList<MemberContext>(); + this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>(); this.memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>(); this.requestsInFlight = new RequestsInFlight(); this.loadAverage = new LoadAverage(); @@ -103,6 +115,8 @@ public class KubernetesClusterContext implements Serializable { Thread th = new Thread(new PendingMemberWatcher(this)); th.start(); + Thread th2 = new Thread(new ObsoletedMemberWatcher(this)); + th2.start(); } public String getKubernetesClusterID() { @@ -328,6 +342,41 @@ public class KubernetesClusterContext implements Serializable { } } + + private class ObsoletedMemberWatcher implements Runnable { + private KubernetesClusterContext ctxt; + + public ObsoletedMemberWatcher(KubernetesClusterContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + while (true) { + + long obsoltedMemberExpiryTime = ctxt.getObsoltedMemberExpiryTime(); + Map<String, MemberContext> obsoletedMembers = ctxt.getObsoletedMembers(); + Iterator<Entry<String, MemberContext>> iterator = obsoletedMembers.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry<String, MemberContext> pairs = iterator.next(); + MemberContext obsoleteMember = (MemberContext) pairs.getValue(); + if (obsoleteMember == null) { + continue; + } + long obsoleteTime = System.currentTimeMillis() - obsoleteMember.getInitTime(); + if (obsoleteTime >= obsoltedMemberExpiryTime) { + iterator.remove(); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + } public float getAverageRequestsInFlight() { return requestsInFlight.getAverage(); @@ -511,4 +560,90 @@ public class KubernetesClusterContext implements Serializable { this.gradientLoadAverageReset = loadAverageReset; this.secondDerivativeLoadAverageRest = loadAverageReset; } + + public void moveActiveMemberToTerminationPendingMembers(String memberId) { + if (memberId == null) { + return; + } + Iterator<MemberContext> iterator = activeMembers.listIterator(); + while ( iterator.hasNext()) { + MemberContext activeMember = iterator.next(); + if(activeMember == null) { + iterator.remove(); + continue; + } + if(memberId.equals(activeMember.getMemberId())){ + // member is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.terminationPendingMembers.add(activeMember); + if (log.isDebugEnabled()) { + log.debug(String.format("Active member is removed and added to the " + + "termination pending member list. [Member Id] %s", memberId)); + } + break; + } + } + } + + public boolean removeTerminationPendingMember(String memberId) { + boolean terminationPendingMemberAvailable = false; + for (MemberContext memberContext: terminationPendingMembers){ + if(memberContext.getMemberId().equals(memberId)){ + terminationPendingMemberAvailable = true; + terminationPendingMembers.remove(memberContext); + break; + } + } + return terminationPendingMemberAvailable; + } + + public long getObsoltedMemberExpiryTime() { + return obsoltedMemberExpiryTime; + } + + public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) { + this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime; + } + + public void addObsoleteMember(MemberContext ctxt) { + this.obsoletedMembers.put(ctxt.getMemberId(), ctxt); + } + + public boolean removeObsoleteMember(String memberId) { + if(this.obsoletedMembers.remove(memberId) == null) { + return false; + } + return true; + } + + public Map<String, MemberContext> getObsoletedMembers() { + return obsoletedMembers; + } + + public void setObsoletedMembers(Map<String, MemberContext> obsoletedMembers) { + this.obsoletedMembers = obsoletedMembers; + } + + public MemberStatsContext getPartitionCtxt(String id) { + return this.memberStatsContexts.get(id); + } + + public List<MemberContext> getTerminationPendingMembers() { + return terminationPendingMembers; + } + + public void setTerminationPendingMembers(List<MemberContext> terminationPendingMembers) { + this.terminationPendingMembers = terminationPendingMembers; + } + + public int getTotalMemberCount() { + + return activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); + } + + public int getNonTerminatedMemberCount() { + return activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); + } }
