adding terminateContainer logic to AS and refactoring drools logging

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/17073da1
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/17073da1
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/17073da1

Branch: refs/heads/container-autoscaling
Commit: 17073da149f07f0b6ca6d202291117c5f63a0659
Parents: c3acfb7
Author: R-Rajkumar <[email protected]>
Authored: Fri Oct 10 16:38:52 2014 +0530
Committer: R-Rajkumar <[email protected]>
Committed: Fri Oct 10 16:38:52 2014 +0530

----------------------------------------------------------------------
 .../cloud/controller/CloudControllerClient.java | 16 ++++++
 .../monitor/KubernetesClusterMonitor.java       | 33 +++++++++--
 .../autoscaler/rule/RuleTasksDelegator.java     |  9 +++
 .../src/main/conf/container-mincheck.drl        | 31 +++++++---
 .../src/main/conf/container-scaling.drl         | 60 ++++++++++----------
 5 files changed, 107 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
index ce69875..8ec9f8e 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
@@ -38,6 +38,7 @@ import 
org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidClu
 import 
org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidIaasProviderExceptionException;
 import 
org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException;
 import 
org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidPartitionExceptionException;
+import 
org.apache.stratos.cloud.controller.stub.CloudControllerServiceMemberTerminationFailedExceptionException;
 import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub;
 import 
org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException;
 import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
@@ -328,4 +329,19 @@ public class CloudControllerClient {
             throw new SpawningException(msg, e);
         } 
     }
+    
+    public synchronized void terminateContainer(String memberId) throws 
TerminationException{
+       try {
+                       stub.terminateContainer(memberId);
+               } catch (RemoteException e) {
+            String msg = "Error while updating kubernetes controller, cannot 
communicate with " +
+                    "cloud controller service";
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+               } catch 
(CloudControllerServiceMemberTerminationFailedExceptionException e) {
+            String msg = "Error while terminating container, member not valid 
for member id : " + memberId;
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+               }
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
index d90e0b6..9375a8e 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
@@ -381,7 +381,7 @@ public abstract class KubernetesClusterMonitor extends 
AbstractClusterMonitor {
 
         // no need to do anything here
         // we will not be receiving this event for containers
-        // because we just kill the containers
+        // we will only receive member terminated event
     }
 
     @Override
@@ -390,16 +390,39 @@ public abstract class KubernetesClusterMonitor extends 
AbstractClusterMonitor {
 
         // no need to do anything here
         // we will not be receiving this event for containers
-        // because we just kill the containers
+       // we will only receive member terminated event
     }
 
     @Override
     public void handleMemberTerminatedEvent(
             MemberTerminatedEvent memberTerminatedEvent) {
 
-        // no need to do anything here
-        // we will not be receiving this event for containers
-        // because we just kill the containers
+        String memberId = memberTerminatedEvent.getMemberId();
+        if 
(getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination 
pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending 
members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if 
(getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is 
removed from "
+                                   + "active members list", memberId));
+        } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
+            log.warn(String.format("Member's obsolated timeout has been 
expired and "
+                                   + "it is removed from obsolated members 
list", memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list 
active, "
+                                   + "pending and termination pending", 
memberId));
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been removed 
successfully: "
+                                   + "[member] %s", memberId));
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index 9d3227a..0a9bde3 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -264,6 +264,15 @@ public class RuleTasksDelegator {
             log.error("Cannot update kubernetes controller ", e);
         }
     }
+    
+    public void delegateTerminateContainer(KubernetesClusterContext 
kubernetesClusterContext, String memberId) {
+       try {
+               CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
+               ccClient.terminateContainer(memberId);
+       } catch (Throwable e) {
+               log.error("Cannot delete container ", e);
+       }
+    }
 
     public int getPredictedReplicasForStat(int minReplicas, float 
statUpperLimit, float statPredictedValue) {
         if (statUpperLimit == 0) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
----------------------------------------------------------------------
diff --git 
a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl 
b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
index 605c553..9798852 100644
--- a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
+++ b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
@@ -26,7 +26,7 @@ global org.apache.stratos.autoscaler.rule.RuleLog log;
 global org.apache.stratos.autoscaler.rule.RuleTasksDelegator $delegator;
 global java.lang.String clusterId;
  
-rule "Minimum Rule"
+rule "Container Minimum Rule"
 dialect "mvel"
        when
            $kubernetesClusterContext : KubernetesClusterContext ()
@@ -36,20 +36,37 @@ dialect "mvel"
            isServiceClusterCreated : Boolean() from 
$kubernetesClusterContext.isServiceClusterCreated()
           
            eval(log.debug("Running minimum rule: [kub-cluster] " 
+kubernetesClusterId + " [cluster] " + clusterId))
-           eval(log.debug("[min-check] " + " [cluster] " + clusterId + " 
[Replicas] nonTerminated : " + nonTerminatedReplicas))
-          eval(log.debug("[min-check] " + " [cluster] " + clusterId + " 
[Replicas] minReplicas : " + minReplicas))
+           eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " 
[Replicas] nonTerminated : " + nonTerminatedReplicas))
+          eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " 
[Replicas] minReplicas : " + minReplicas))
           eval(nonTerminatedReplicas < minReplicas)
        then
            if (isServiceClusterCreated) {
              // we suceeded calling startContainer() once, can't call it again
-              log.info("[min-check] Decided to scale-up : [cluster] " + 
clusterId);
-             log.info("[min-check] " + " [cluster] " + clusterId + " min-rule 
not satisfied, expanding cluster to minReplicas : " + minReplicas);
+              log.info("[min-check] Decided to scale-up : [cluster] : " + 
clusterId);
+             log.info("[min-check] " + " [cluster] : " + clusterId + " ; 
min-rule not satisfied, expanding cluster to minReplicas : " + minReplicas);
               $delegator.delegateUpdateContainers($kubernetesClusterContext, 
minReplicas);
           } else {
              // we should call startContainer
-              log.info("[min-check] Decided to create the cluster : [cluster] 
" + clusterId);
-             log.info("[min-check] " + " [cluster] " + clusterId + " : 
min-rule not satisfied, no containers created yet, creating minReplicas : " + 
minReplicas);
+              log.info("[min-check] Decided to create the cluster : [cluster] 
: " + clusterId);
+             log.info("[min-check] " + " [cluster] : " + clusterId + " ; 
min-rule not satisfied, no containers created yet, creating minReplicas : " + 
minReplicas);
               $delegator.delegateStartContainers($kubernetesClusterContext);
            }
 end
 
+rule "Terminate Obsoleted Containers"
+dialect "mvel"
+        when
+           $kubernetesClusterContext : KubernetesClusterContext ()
+           kubernetesClusterId : String() from 
$kubernetesClusterContext.getKubernetesClusterID()
+           obsoleteReplicas : Integer() from 
$kubernetesClusterContext.getObsoletedMembers().size()
+
+           eval(log.debug("Running obsolete containers rule [kub-cluster] : " 
+ kubernetesClusterId + " [cluster] : " + clusterId))
+           eval(log.debug("[obsolete-check] " + "[cluster] : " + clusterId + " 
[Replicas] obsoleteReplicas : " + obsoleteReplicas))
+           
eval($kubernetesClusterContext.getObsoletedMembers().keySet().size() > 0)
+           memberId : String() from 
$kubernetesClusterContext.getObsoletedMembers().keySet()
+           eval(log.debug("[obsolete-check] [kub-cluster] : " + 
kubernetesClusterId + " [cluster] : " + clusterId + " Member id : " + memberId))
+        then
+           $delegator.delegateTerminateContainer($kubernetesClusterContext, 
memberId);
+end
+
+

http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
----------------------------------------------------------------------
diff --git 
a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl 
b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
index 398049b..46d9aeb 100644
--- a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
@@ -40,7 +40,7 @@ dialect "mvel"
         maxReplicas : Integer() from $kubernetesClusterContext.getMaxReplicas()
         nonTerminatedReplicas : Integer() from 
$kubernetesClusterContext.getNonTerminatedMemberCount()
 
-        eval(log.debug("Running scaling rule : [kub-cluster] " + 
kubernetesClusterId + " [cluster] " + clusterId))
+        eval(log.debug("Running scaling rule [kub-cluster] : " + 
kubernetesClusterId + " [cluster] : " + clusterId))
        
        $loadThresholds : LoadThresholds() from  
autoscalePolicy.getLoadThresholds()
 
@@ -79,81 +79,81 @@ dialect "mvel"
         scaleUp : Boolean() from (scaleUpForRif || scaleUpForMc || 
scaleUpForLa)
         scaleDown : Boolean() from (scaleDownForRif && scaleDownForMc &&  
scaleDownForLa)
 
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] 
nonTerminated : " + nonTerminatedReplicas))
-       eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] 
minReplicas : " + minReplicas))
-       eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] 
maxReplicas : " + maxReplicas))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[Replicas] nonTerminated : " + nonTerminatedReplicas))
+       eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[Replicas] minReplicas : " + minReplicas))
+       eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[Replicas] maxReplicas : " + maxReplicas))
 
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[RequestInFlight] resetted ? : " + rifReset))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[RequestInFlight] predicted value : " + rifPredictedValue))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[RequestInFlight] upper limit : " + rifUpperLimit))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[RequestInFlight] lower limit : " + rifLowerLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[RequestInFlight] resetted ? : " + rifReset))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[RequestInFlight] predicted value : " + rifPredictedValue))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[RequestInFlight] upper limit : " + rifUpperLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[RequestInFlight] lower limit : " + rifLowerLimit))
 
-       eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[MemoryConsumption] resetted ? : " + mcReset))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[MemoryConsumption] predicted value : " + mcPredictedValue))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[MemoryConsumption] upper limit : " + mcUpperLimit))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[MemoryConsumption] lower limit : " + mcLowerLimit))
+       eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[MemoryConsumption] resetted ? : " + mcReset))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[MemoryConsumption] predicted value : " + mcPredictedValue))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[MemoryConsumption] upper limit : " + mcUpperLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[MemoryConsumption] lower limit : " + mcLowerLimit))
 
-       eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[LoadAverage] resetted ? : " + laReset))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[LoadAverage] predicted value : " + laPredictedValue))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[LoadAverage] upper limit : " + laUpperLimit))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " 
[LoadAverage] lower limit : " + laLowerLimit))
+       eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[LoadAverage] resetted ? : " + laReset))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[LoadAverage] predicted value : " + laPredictedValue))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[LoadAverage] upper limit : " + laUpperLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
[LoadAverage] lower limit : " + laLowerLimit))
 
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-up 
action : " + scaleUp))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-down 
action : " + scaleDown))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " scale-up 
action : " + scaleUp))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " 
scale-down action : " + scaleDown))
 
        then
         if (scaleUp) {
            int requiredReplicas = 0;
            if (scaleUpForRif) {
               int predictedReplicasForRif = 
$delegator.getPredictedReplicasForStat(minReplicas, rifUpperLimit, 
rifPredictedValue);
-             log.info("[scaling] " + " [cluster] " + clusterId + " 
[RequestInFlight] predicted replicas : " + predictedReplicasForRif);
+             log.info("[scaling] " + " [cluster] : " + clusterId + " 
[RequestInFlight] predicted replicas : " + predictedReplicasForRif);
               if (predictedReplicasForRif > requiredReplicas ) {
                 requiredReplicas = predictedReplicasForRif;
               }
            } 
            if (scaleUpForMc) {
               int predictedReplicasForMc = 
$delegator.getPredictedReplicasForStat(minReplicas, mcUpperLimit, 
mcPredictedValue);
-              log.info("[scaling] " + " [cluster] " + clusterId + " 
[MemoryConsumption] predicted replicas : " + predictedReplicasForMc);
+              log.info("[scaling] " + " [cluster] : " + clusterId + " 
[MemoryConsumption] predicted replicas : " + predictedReplicasForMc);
               if (predictedReplicasForMc > requiredReplicas ) {
                 requiredReplicas = predictedReplicasForMc;
               }
             }
             if (scaleUpForLa) {
               int predictedReplicasForLa = 
$delegator.getPredictedReplicasForStat(minReplicas, laUpperLimit, 
laPredictedValue);
-              log.info("[scaling] " + " [cluster] " + clusterId + " 
[LoadAverage] predicted replicas : " + predictedReplicasForLa);
+              log.info("[scaling] " + " [cluster] : " + clusterId + " 
[LoadAverage] predicted replicas : " + predictedReplicasForLa);
               if (predictedReplicasForLa > requiredReplicas ) {
                 requiredReplicas = predictedReplicasForLa;
               }
             }
            //max-check
            if (requiredReplicas > maxReplicas) {
-             log.info("[scaling] " + " [cluster] " + clusterId + " predicted 
replicas > max replicas : ");
+             log.info("[scaling] " + " [cluster] : " + clusterId + " predicted 
replicas > max replicas : ");
               requiredReplicas = maxReplicas;
             }
            //min-check
            if (requiredReplicas < minReplicas) {
-             log.info("[scaling] " + " [cluster] " + clusterId + " predicted 
replicas < min replicas : ");
+             log.info("[scaling] " + " [cluster] : " + clusterId + " predicted 
replicas < min replicas : ");
               requiredReplicas = minReplicas;
             }
             //expand the cluster
             if (requiredReplicas > nonTerminatedReplicas) {
-              log.info("[scaling] Decided to scale-up : [cluster] " + 
clusterId);
-             log.info("[scaling-up] " + " [cluster] " + clusterId + " valid 
number of replicas to expand : " + requiredReplicas);
+              log.info("[scaling] Decided to scale-up : [cluster] : " + 
clusterId);
+             log.info("[scaling-up] " + " [cluster] : " + clusterId + " valid 
number of replicas to expand : " + requiredReplicas);
              $delegator.delegateUpdateContainers($kubernetesClusterContext, 
requiredReplicas);
             }
             //shrink the cluster
             if (requiredReplicas < nonTerminatedReplicas) {
-              log.info("[scaling] Decided to scale-down : [cluster] " + 
clusterId);
-             log.info("[scaling-down] " + " [cluster] " + clusterId + " valid 
number of replicas to shrink : " + requiredReplicas);
+              log.info("[scaling] Decided to scale-down : [cluster] : " + 
clusterId);
+             log.info("[scaling-down] " + " [cluster] : " + clusterId + " 
valid number of replicas to shrink : " + requiredReplicas);
              $delegator.delegateUpdateContainers($kubernetesClusterContext, 
requiredReplicas);
             }
             if (requiredReplicas == nonTerminatedReplicas) {
-             log.info("[scaling] " + " [cluster] " + clusterId + "non 
terminated replicas and predicted replicas are same");
+             log.info("[scaling] " + " [cluster] : " + clusterId + "non 
terminated replicas and predicted replicas are same");
             }
             
         } else if (scaleDown) {
-            log.info("[scaling] Decided to scale-down : [cluster] " + 
clusterId);
-            log.info("[scaling-down] " + " [cluster] " + clusterId + " shrink 
the cluster to minReplicas : " + minReplicas);
+            log.info("[scaling] Decided to scale-down [cluster] : " + 
clusterId);
+            log.info("[scaling-down] " + " [cluster] : " + clusterId + " 
shrink the cluster to minReplicas : " + minReplicas);
             //shrink the cluster to minReplicas
             $delegator.delegateUpdateContainers($kubernetesClusterContext, 
minReplicas);
         } else {

Reply via email to