Repository: stratos
Updated Branches:
  refs/heads/docker-integration 12ed2dc7e -> a5cd17e8c


Refactoring and removing unused code from Kubernetes Cluster Context


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a5cd17e8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a5cd17e8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a5cd17e8

Branch: refs/heads/docker-integration
Commit: a5cd17e8c09050866e238a66bae9447a7a80cc8b
Parents: 317a299
Author: Nirmal Fernando <[email protected]>
Authored: Wed Sep 24 19:30:52 2014 +0530
Committer: Nirmal Fernando <[email protected]>
Committed: Wed Sep 24 19:31:11 2014 +0530

----------------------------------------------------------------------
 .../autoscaler/KubernetesClusterContext.java    | 175 ++-----------------
 .../monitor/ClusterMonitorFactory.java          |   3 +-
 2 files changed, 16 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/a5cd17e8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
index e1d5e30..1858401 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
@@ -5,13 +5,14 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.TerminationException;
 import org.apache.stratos.autoscaler.policy.model.LoadAverage;
 import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
 import org.apache.stratos.autoscaler.policy.model.RequestsInFlight;
@@ -37,21 +38,10 @@ public class KubernetesClusterContext implements 
Serializable{
     private long expiryTime = 900000;
     // pending members
     private List<MemberContext> pendingMembers;
-    private int pendingMembersFailureCount = 0;
-    private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5;
-    
-    // 1 day as default
-    private long obsoltedMemberExpiryTime = 1*24*60*60*1000;
-
-    // members to be terminated
-    private Map<String, MemberContext> obsoletedMembers;
     
     // active members
     private List<MemberContext> activeMembers;
 
-    // termination pending members, member is added to this when Autoscaler 
send grace fully shut down event
-    private List<MemberContext> terminationPendingMembers;
-
     //Keep statistics come from CEP
     private Map<String, MemberStatsContext> memberStatsContexts;
        
@@ -60,6 +50,9 @@ public class KubernetesClusterContext implements Serializable{
     private MemoryConsumption memoryConsumption;
     private LoadAverage loadAverage;
     
+    // cluster id
+    private String clusterId;
+    
     //boolean values to keep whether the requests in flight parameters are 
reset or not
     private boolean rifReset = false, averageRifReset = false, 
gradientRifReset = false, secondDerivativeRifRest = false;
     //boolean values to keep whether the memory consumption parameters are 
reset or not
@@ -69,12 +62,11 @@ public class KubernetesClusterContext implements 
Serializable{
     private boolean loadAverageReset = false, averageLoadAverageReset = false, 
gradientLoadAverageReset = false,
             secondDerivativeLoadAverageRest = false;
     
-       public KubernetesClusterContext(String kubernetesClusterId){
+       public KubernetesClusterContext(String kubernetesClusterId, String 
clusterId){
                this.kubernetesClusterId = kubernetesClusterId;
+               this.clusterId = clusterId;
         this.pendingMembers = new ArrayList<MemberContext>();
         this.activeMembers = new ArrayList<MemberContext>();
-        this.terminationPendingMembers = new ArrayList<MemberContext>();
-        this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>();
         this.memberStatsContexts = new ConcurrentHashMap<String, 
MemberStatsContext>();
         this.requestsInFlight = new RequestsInFlight();
         this.loadAverage = new LoadAverage();
@@ -89,8 +81,6 @@ public class KubernetesClusterContext implements Serializable{
         
         Thread th = new Thread(new PendingMemberWatcher(this));
         th.start();
-        Thread th2 = new Thread(new ObsoletedMemberWatcher(this));
-        th2.start();
        }
        
        public String getKubernetesClusterID() {
@@ -177,7 +167,6 @@ public class KubernetesClusterContext implements 
Serializable{
                                iterator.remove();
                                // add to the activated list
                                this.activeMembers.add(pendingMember);
-                               pendingMembersFailureCount = 0;
                                if (log.isDebugEnabled()) {
                                        log.debug(String.format(
                                                        "Pending member is 
removed and added to the "
@@ -189,34 +178,6 @@ public class KubernetesClusterContext implements 
Serializable{
                }
        }
 
-       public void moveActiveMemberToTerminationPendingMembers(String 
memberId) {
-               if (memberId == null) {
-                       return;
-               }
-               Iterator<MemberContext> iterator = activeMembers.listIterator();
-               while (iterator.hasNext()) {
-                       MemberContext activeMember = iterator.next();
-                       if (activeMember == null) {
-                               iterator.remove();
-                               continue;
-                       }
-                       if (memberId.equals(activeMember.getMemberId())) {
-                               // member is activated
-                               // remove from pending list
-                               iterator.remove();
-                               // add to the activated list
-                               
this.terminationPendingMembers.add(activeMember);
-                               if (log.isDebugEnabled()) {
-                                       log.debug(String
-                                                       .format("Active member 
is removed and added to the "
-                                                                       + 
"termination pending member list. [Member Id] %s",
-                                                                       
memberId));
-                               }
-                               break;
-                       }
-               }
-       }
-           
        public void addActiveMember(MemberContext ctxt) {
                this.activeMembers.add(ctxt);
        }
@@ -225,37 +186,6 @@ public class KubernetesClusterContext implements 
Serializable{
                this.activeMembers.remove(ctxt);
        }
 
-       public boolean removeTerminationPendingMember(String memberId) {
-               boolean terminationPendingMemberAvailable = false;
-               for (MemberContext memberContext : terminationPendingMembers) {
-                       if (memberContext.getMemberId().equals(memberId)) {
-                               terminationPendingMemberAvailable = true;
-                               terminationPendingMembers.remove(memberContext);
-                               break;
-                       }
-               }
-               return terminationPendingMemberAvailable;
-       }
-           
-       public long getObsoltedMemberExpiryTime() {
-               return obsoltedMemberExpiryTime;
-       }
-
-       public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) {
-               this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime;
-       }
-
-       public void addObsoleteMember(MemberContext ctxt) {
-               this.obsoletedMembers.put(ctxt.getMemberId(), ctxt);
-       }
-
-       public boolean removeObsoleteMember(String memberId) {
-               if (this.obsoletedMembers.remove(memberId) == null) {
-                       return false;
-               }
-               return true;
-       }
-
        public long getExpiryTime() {
                return expiryTime;
        }
@@ -264,14 +194,6 @@ public class KubernetesClusterContext implements 
Serializable{
                this.expiryTime = expiryTime;
        }
            
-       public Map<String, MemberContext> getObsoletedMembers() {
-               return obsoletedMembers;
-       }
-
-       public void setObsoletedMembers(Map<String, MemberContext> 
obsoletedMembers) {
-               this.obsoletedMembers = obsoletedMembers;
-       }
-
        public Map<String, MemberStatsContext> getMemberStatsContexts() {
                return memberStatsContexts;
        }
@@ -304,25 +226,6 @@ public class KubernetesClusterContext implements 
Serializable{
                this.serviceName = serviceName;
        }
 
-       public List<MemberContext> getTerminationPendingMembers() {
-               return terminationPendingMembers;
-       }
-
-       public void setTerminationPendingMembers(
-                       List<MemberContext> terminationPendingMembers) {
-               this.terminationPendingMembers = terminationPendingMembers;
-       }
-
-       public int getTotalMemberCount() {
-               return activeMembers.size() + pendingMembers.size()
-                               + terminationPendingMembers.size();
-       }
-
-       public int getNonTerminatedMemberCount() {
-               return activeMembers.size() + pendingMembers.size()
-                               + terminationPendingMembers.size();
-       }
-           
        public List<MemberContext> getActiveMembers() {
                return activeMembers;
        }
@@ -381,24 +284,14 @@ public class KubernetesClusterContext implements 
Serializable{
                                                                - 
pendingMember.getInitTime();
                                                if (pendingTime >= expiryTime) {
 
-                                                       iterator.remove();
-                                                       log.info("Pending state 
of member: "
-                                                                       + 
pendingMember.getMemberId()
-                                                                       + " is 
expired. "
-                                                                       + 
"Adding as an obsoleted member.");
-                                                       // member should be 
terminated
-                                                       
ctxt.addObsoleteMember(pendingMember);
-                                                       
pendingMembersFailureCount++;
-                                                       if 
(pendingMembersFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD) {
-                                                               
setExpiryTime(expiryTime * 2);// Doubles the
-                                                                               
                                                // expiry time
-                                                                               
                                                // after the
-                                                                               
                                                // threshold of
-                                                                               
                                                // failure
-                                                                               
                                                // exceeded
-                                                               // TODO 
Implement an alerting system:
-                                                               // STRATOS-369
+                                                       // terminate all 
containers of this cluster
+                                                       try {
+                                                               
CloudControllerClient.getInstance().terminateAllContainers(clusterId);
+                                                               
iterator.remove();
+                                                       } catch 
(TerminationException e) {
+                                                               
log.error(e.getMessage(), e);
                                                        }
+                                                       
                                                }
                                        }
                                }
@@ -413,46 +306,6 @@ public class KubernetesClusterContext implements 
Serializable{
 
        }
 
-       private class ObsoletedMemberWatcher implements Runnable {
-               private KubernetesClusterContext ctxt;
-
-               public ObsoletedMemberWatcher(KubernetesClusterContext ctxt) {
-                       this.ctxt = ctxt;
-               }
-
-               @Override
-               public void run() {
-                       while (true) {
-
-                               long obsoltedMemberExpiryTime = ctxt
-                                               .getObsoltedMemberExpiryTime();
-                               Map<String, MemberContext> obsoletedMembers = 
ctxt
-                                               .getObsoletedMembers();
-                               Iterator<Entry<String, MemberContext>> iterator 
= obsoletedMembers
-                                               .entrySet().iterator();
-
-                               while (iterator.hasNext()) {
-                                       Map.Entry<String, MemberContext> pairs 
= iterator.next();
-                                       MemberContext obsoleteMember = 
(MemberContext) pairs
-                                                       .getValue();
-                                       if (obsoleteMember == null) {
-                                               continue;
-                                       }
-                                       long obsoleteTime = 
System.currentTimeMillis()
-                                                       - 
obsoleteMember.getInitTime();
-                                       if (obsoleteTime >= 
obsoltedMemberExpiryTime) {
-                                               iterator.remove();
-                                       }
-                               }
-                               try {
-                                       // TODO find a constant
-                                       Thread.sleep(15000);
-                               } catch (InterruptedException ignore) {
-                               }
-                       }
-               }
-       }
-  
        public float getAverageRequestsInFlight() {
                return requestsInFlight.getAverage();
        }

http://git-wip-us.apache.org/repos/asf/stratos/blob/a5cd17e8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
index 489078e..39494dc 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
@@ -307,7 +307,8 @@ public class ClusterMonitorFactory {
         AutoscalePolicy policy = 
PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
         java.util.Properties props = cluster.getProperties();
         String kubernetesHostClusterID = 
props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-               KubernetesClusterContext kubernetesClusterCtxt = new 
KubernetesClusterContext(kubernetesHostClusterID);
+               KubernetesClusterContext kubernetesClusterCtxt = new 
KubernetesClusterContext(kubernetesHostClusterID, 
+                               cluster.getClusterId());
 
         DockerServiceClusterMonitor dockerClusterMonitor = new 
DockerServiceClusterMonitor(
                        kubernetesClusterCtxt, 

Reply via email to