Repository: stratos Updated Branches: refs/heads/docker-integration 12ed2dc7e -> a5cd17e8c
Refactoring and removing unused code from Kubernetes Cluster Context Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a5cd17e8 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a5cd17e8 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a5cd17e8 Branch: refs/heads/docker-integration Commit: a5cd17e8c09050866e238a66bae9447a7a80cc8b Parents: 317a299 Author: Nirmal Fernando <[email protected]> Authored: Wed Sep 24 19:30:52 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Wed Sep 24 19:31:11 2014 +0530 ---------------------------------------------------------------------- .../autoscaler/KubernetesClusterContext.java | 175 ++----------------- .../monitor/ClusterMonitorFactory.java | 3 +- 2 files changed, 16 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/a5cd17e8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java index e1d5e30..1858401 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java @@ -5,13 +5,14 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; +import org.apache.stratos.autoscaler.exception.TerminationException; import org.apache.stratos.autoscaler.policy.model.LoadAverage; import org.apache.stratos.autoscaler.policy.model.MemoryConsumption; import org.apache.stratos.autoscaler.policy.model.RequestsInFlight; @@ -37,21 +38,10 @@ public class KubernetesClusterContext implements Serializable{ private long expiryTime = 900000; // pending members private List<MemberContext> pendingMembers; - private int pendingMembersFailureCount = 0; - private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5; - - // 1 day as default - private long obsoltedMemberExpiryTime = 1*24*60*60*1000; - - // members to be terminated - private Map<String, MemberContext> obsoletedMembers; // active members private List<MemberContext> activeMembers; - // termination pending members, member is added to this when Autoscaler send grace fully shut down event - private List<MemberContext> terminationPendingMembers; - //Keep statistics come from CEP private Map<String, MemberStatsContext> memberStatsContexts; @@ -60,6 +50,9 @@ public class KubernetesClusterContext implements Serializable{ private MemoryConsumption memoryConsumption; private LoadAverage loadAverage; + // cluster id + private String clusterId; + //boolean values to keep whether the requests in flight parameters are reset or not private boolean rifReset = false, averageRifReset = false, gradientRifReset = false, secondDerivativeRifRest = false; //boolean values to keep whether the memory consumption parameters are reset or not @@ -69,12 +62,11 @@ public class KubernetesClusterContext implements Serializable{ private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false; - public KubernetesClusterContext(String kubernetesClusterId){ + public KubernetesClusterContext(String kubernetesClusterId, String clusterId){ this.kubernetesClusterId = kubernetesClusterId; + this.clusterId = clusterId; this.pendingMembers = new ArrayList<MemberContext>(); this.activeMembers = new ArrayList<MemberContext>(); - this.terminationPendingMembers = new ArrayList<MemberContext>(); - this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>(); this.memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>(); this.requestsInFlight = new RequestsInFlight(); this.loadAverage = new LoadAverage(); @@ -89,8 +81,6 @@ public class KubernetesClusterContext implements Serializable{ Thread th = new Thread(new PendingMemberWatcher(this)); th.start(); - Thread th2 = new Thread(new ObsoletedMemberWatcher(this)); - th2.start(); } public String getKubernetesClusterID() { @@ -177,7 +167,6 @@ public class KubernetesClusterContext implements Serializable{ iterator.remove(); // add to the activated list this.activeMembers.add(pendingMember); - pendingMembersFailureCount = 0; if (log.isDebugEnabled()) { log.debug(String.format( "Pending member is removed and added to the " @@ -189,34 +178,6 @@ public class KubernetesClusterContext implements Serializable{ } } - public void moveActiveMemberToTerminationPendingMembers(String memberId) { - if (memberId == null) { - return; - } - Iterator<MemberContext> iterator = activeMembers.listIterator(); - while (iterator.hasNext()) { - MemberContext activeMember = iterator.next(); - if (activeMember == null) { - iterator.remove(); - continue; - } - if (memberId.equals(activeMember.getMemberId())) { - // member is activated - // remove from pending list - iterator.remove(); - // add to the activated list - this.terminationPendingMembers.add(activeMember); - if (log.isDebugEnabled()) { - log.debug(String - .format("Active member is removed and added to the " - + "termination pending member list. [Member Id] %s", - memberId)); - } - break; - } - } - } - public void addActiveMember(MemberContext ctxt) { this.activeMembers.add(ctxt); } @@ -225,37 +186,6 @@ public class KubernetesClusterContext implements Serializable{ this.activeMembers.remove(ctxt); } - public boolean removeTerminationPendingMember(String memberId) { - boolean terminationPendingMemberAvailable = false; - for (MemberContext memberContext : terminationPendingMembers) { - if (memberContext.getMemberId().equals(memberId)) { - terminationPendingMemberAvailable = true; - terminationPendingMembers.remove(memberContext); - break; - } - } - return terminationPendingMemberAvailable; - } - - public long getObsoltedMemberExpiryTime() { - return obsoltedMemberExpiryTime; - } - - public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) { - this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime; - } - - public void addObsoleteMember(MemberContext ctxt) { - this.obsoletedMembers.put(ctxt.getMemberId(), ctxt); - } - - public boolean removeObsoleteMember(String memberId) { - if (this.obsoletedMembers.remove(memberId) == null) { - return false; - } - return true; - } - public long getExpiryTime() { return expiryTime; } @@ -264,14 +194,6 @@ public class KubernetesClusterContext implements Serializable{ this.expiryTime = expiryTime; } - public Map<String, MemberContext> getObsoletedMembers() { - return obsoletedMembers; - } - - public void setObsoletedMembers(Map<String, MemberContext> obsoletedMembers) { - this.obsoletedMembers = obsoletedMembers; - } - public Map<String, MemberStatsContext> getMemberStatsContexts() { return memberStatsContexts; } @@ -304,25 +226,6 @@ public class KubernetesClusterContext implements Serializable{ this.serviceName = serviceName; } - public List<MemberContext> getTerminationPendingMembers() { - return terminationPendingMembers; - } - - public void setTerminationPendingMembers( - List<MemberContext> terminationPendingMembers) { - this.terminationPendingMembers = terminationPendingMembers; - } - - public int getTotalMemberCount() { - return activeMembers.size() + pendingMembers.size() - + terminationPendingMembers.size(); - } - - public int getNonTerminatedMemberCount() { - return activeMembers.size() + pendingMembers.size() - + terminationPendingMembers.size(); - } - public List<MemberContext> getActiveMembers() { return activeMembers; } @@ -381,24 +284,14 @@ public class KubernetesClusterContext implements Serializable{ - pendingMember.getInitTime(); if (pendingTime >= expiryTime) { - iterator.remove(); - log.info("Pending state of member: " - + pendingMember.getMemberId() - + " is expired. " - + "Adding as an obsoleted member."); - // member should be terminated - ctxt.addObsoleteMember(pendingMember); - pendingMembersFailureCount++; - if (pendingMembersFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD) { - setExpiryTime(expiryTime * 2);// Doubles the - // expiry time - // after the - // threshold of - // failure - // exceeded - // TODO Implement an alerting system: - // STRATOS-369 + // terminate all containers of this cluster + try { + CloudControllerClient.getInstance().terminateAllContainers(clusterId); + iterator.remove(); + } catch (TerminationException e) { + log.error(e.getMessage(), e); } + } } } @@ -413,46 +306,6 @@ public class KubernetesClusterContext implements Serializable{ } - private class ObsoletedMemberWatcher implements Runnable { - private KubernetesClusterContext ctxt; - - public ObsoletedMemberWatcher(KubernetesClusterContext ctxt) { - this.ctxt = ctxt; - } - - @Override - public void run() { - while (true) { - - long obsoltedMemberExpiryTime = ctxt - .getObsoltedMemberExpiryTime(); - Map<String, MemberContext> obsoletedMembers = ctxt - .getObsoletedMembers(); - Iterator<Entry<String, MemberContext>> iterator = obsoletedMembers - .entrySet().iterator(); - - while (iterator.hasNext()) { - Map.Entry<String, MemberContext> pairs = iterator.next(); - MemberContext obsoleteMember = (MemberContext) pairs - .getValue(); - if (obsoleteMember == null) { - continue; - } - long obsoleteTime = System.currentTimeMillis() - - obsoleteMember.getInitTime(); - if (obsoleteTime >= obsoltedMemberExpiryTime) { - iterator.remove(); - } - } - try { - // TODO find a constant - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - } - } - } - public float getAverageRequestsInFlight() { return requestsInFlight.getAverage(); } http://git-wip-us.apache.org/repos/asf/stratos/blob/a5cd17e8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java index 489078e..39494dc 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java @@ -307,7 +307,8 @@ public class ClusterMonitorFactory { AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName); java.util.Properties props = cluster.getProperties(); String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID); - KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID); + KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID, + cluster.getClusterId()); DockerServiceClusterMonitor dockerClusterMonitor = new DockerServiceClusterMonitor( kubernetesClusterCtxt,
