adding terminateContainer logic to AS and refactoring drools logging
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/17073da1 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/17073da1 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/17073da1 Branch: refs/heads/container-autoscaling Commit: 17073da149f07f0b6ca6d202291117c5f63a0659 Parents: c3acfb7 Author: R-Rajkumar <[email protected]> Authored: Fri Oct 10 16:38:52 2014 +0530 Committer: R-Rajkumar <[email protected]> Committed: Fri Oct 10 16:38:52 2014 +0530 ---------------------------------------------------------------------- .../cloud/controller/CloudControllerClient.java | 16 ++++++ .../monitor/KubernetesClusterMonitor.java | 33 +++++++++-- .../autoscaler/rule/RuleTasksDelegator.java | 9 +++ .../src/main/conf/container-mincheck.drl | 31 +++++++--- .../src/main/conf/container-scaling.drl | 60 ++++++++++---------- 5 files changed, 107 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java index ce69875..8ec9f8e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java @@ -38,6 +38,7 @@ import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidClu import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidIaasProviderExceptionException; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidPartitionExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceMemberTerminationFailedExceptionException; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException; import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; @@ -328,4 +329,19 @@ public class CloudControllerClient { throw new SpawningException(msg, e); } } + + public synchronized void terminateContainer(String memberId) throws TerminationException{ + try { + stub.terminateContainer(memberId); + } catch (RemoteException e) { + String msg = "Error while updating kubernetes controller, cannot communicate with " + + "cloud controller service"; + log.error(msg, e); + throw new TerminationException(msg, e); + } catch (CloudControllerServiceMemberTerminationFailedExceptionException e) { + String msg = "Error while terminating container, member not valid for member id : " + memberId; + log.error(msg, e); + throw new TerminationException(msg, e); + } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java index d90e0b6..9375a8e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java @@ -381,7 +381,7 @@ public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor { // no need to do anything here // we will not be receiving this event for containers - // because we just kill the containers + // we will only receive member terminated event } @Override @@ -390,16 +390,39 @@ public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor { // no need to do anything here // we will not be receiving this event for containers - // because we just kill the containers + // we will only receive member terminated event } @Override public void handleMemberTerminatedEvent( MemberTerminatedEvent memberTerminatedEvent) { - // no need to do anything here - // we will not be receiving this event for containers - // because we just kill the containers + String memberId = memberTerminatedEvent.getMemberId(); + if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from termination pending members list: " + + "[member] %s", memberId)); + } + } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from pending members list: " + + "[member] %s", memberId)); + } + } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) { + log.warn(String.format("Member is in the wrong list and it is removed from " + + "active members list", memberId)); + } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) { + log.warn(String.format("Member's obsolated timeout has been expired and " + + "it is removed from obsolated members list", memberId)); + } else { + log.warn(String.format("Member is not available in any of the list active, " + + "pending and termination pending", memberId)); + } + + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been removed successfully: " + + "[member] %s", memberId)); + } } @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 9d3227a..0a9bde3 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -264,6 +264,15 @@ public class RuleTasksDelegator { log.error("Cannot update kubernetes controller ", e); } } + + public void delegateTerminateContainer(KubernetesClusterContext kubernetesClusterContext, String memberId) { + try { + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + ccClient.terminateContainer(memberId); + } catch (Throwable e) { + log.error("Cannot delete container ", e); + } + } public int getPredictedReplicasForStat(int minReplicas, float statUpperLimit, float statPredictedValue) { if (statUpperLimit == 0) { http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl index 605c553..9798852 100644 --- a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl +++ b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl @@ -26,7 +26,7 @@ global org.apache.stratos.autoscaler.rule.RuleLog log; global org.apache.stratos.autoscaler.rule.RuleTasksDelegator $delegator; global java.lang.String clusterId; -rule "Minimum Rule" +rule "Container Minimum Rule" dialect "mvel" when $kubernetesClusterContext : KubernetesClusterContext () @@ -36,20 +36,37 @@ dialect "mvel" isServiceClusterCreated : Boolean() from $kubernetesClusterContext.isServiceClusterCreated() eval(log.debug("Running minimum rule: [kub-cluster] " +kubernetesClusterId + " [cluster] " + clusterId)) - eval(log.debug("[min-check] " + " [cluster] " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas)) - eval(log.debug("[min-check] " + " [cluster] " + clusterId + " [Replicas] minReplicas : " + minReplicas)) + eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas)) + eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " [Replicas] minReplicas : " + minReplicas)) eval(nonTerminatedReplicas < minReplicas) then if (isServiceClusterCreated) { // we suceeded calling startContainer() once, can't call it again - log.info("[min-check] Decided to scale-up : [cluster] " + clusterId); - log.info("[min-check] " + " [cluster] " + clusterId + " min-rule not satisfied, expanding cluster to minReplicas : " + minReplicas); + log.info("[min-check] Decided to scale-up : [cluster] : " + clusterId); + log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, expanding cluster to minReplicas : " + minReplicas); $delegator.delegateUpdateContainers($kubernetesClusterContext, minReplicas); } else { // we should call startContainer - log.info("[min-check] Decided to create the cluster : [cluster] " + clusterId); - log.info("[min-check] " + " [cluster] " + clusterId + " : min-rule not satisfied, no containers created yet, creating minReplicas : " + minReplicas); + log.info("[min-check] Decided to create the cluster : [cluster] : " + clusterId); + log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, no containers created yet, creating minReplicas : " + minReplicas); $delegator.delegateStartContainers($kubernetesClusterContext); } end +rule "Terminate Obsoleted Containers" +dialect "mvel" + when + $kubernetesClusterContext : KubernetesClusterContext () + kubernetesClusterId : String() from $kubernetesClusterContext.getKubernetesClusterID() + obsoleteReplicas : Integer() from $kubernetesClusterContext.getObsoletedMembers().size() + + eval(log.debug("Running obsolete containers rule [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId)) + eval(log.debug("[obsolete-check] " + "[cluster] : " + clusterId + " [Replicas] obsoleteReplicas : " + obsoleteReplicas)) + eval($kubernetesClusterContext.getObsoletedMembers().keySet().size() > 0) + memberId : String() from $kubernetesClusterContext.getObsoletedMembers().keySet() + eval(log.debug("[obsolete-check] [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId + " Member id : " + memberId)) + then + $delegator.delegateTerminateContainer($kubernetesClusterContext, memberId); +end + + http://git-wip-us.apache.org/repos/asf/stratos/blob/17073da1/products/stratos/modules/distribution/src/main/conf/container-scaling.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl index 398049b..46d9aeb 100644 --- a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl @@ -40,7 +40,7 @@ dialect "mvel" maxReplicas : Integer() from $kubernetesClusterContext.getMaxReplicas() nonTerminatedReplicas : Integer() from $kubernetesClusterContext.getNonTerminatedMemberCount() - eval(log.debug("Running scaling rule : [kub-cluster] " + kubernetesClusterId + " [cluster] " + clusterId)) + eval(log.debug("Running scaling rule [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId)) $loadThresholds : LoadThresholds() from autoscalePolicy.getLoadThresholds() @@ -79,81 +79,81 @@ dialect "mvel" scaleUp : Boolean() from (scaleUpForRif || scaleUpForMc || scaleUpForLa) scaleDown : Boolean() from (scaleDownForRif && scaleDownForMc && scaleDownForLa) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] minReplicas : " + minReplicas)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] maxReplicas : " + maxReplicas)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] minReplicas : " + minReplicas)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] maxReplicas : " + maxReplicas)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] resetted ? : " + rifReset)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] predicted value : " + rifPredictedValue)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] upper limit : " + rifUpperLimit)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] lower limit : " + rifLowerLimit)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] resetted ? : " + rifReset)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] predicted value : " + rifPredictedValue)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] upper limit : " + rifUpperLimit)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] lower limit : " + rifLowerLimit)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] resetted ? : " + mcReset)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] predicted value : " + mcPredictedValue)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] upper limit : " + mcUpperLimit)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] lower limit : " + mcLowerLimit)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] resetted ? : " + mcReset)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] predicted value : " + mcPredictedValue)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] upper limit : " + mcUpperLimit)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] lower limit : " + mcLowerLimit)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] resetted ? : " + laReset)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] predicted value : " + laPredictedValue)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] upper limit : " + laUpperLimit)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] lower limit : " + laLowerLimit)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] resetted ? : " + laReset)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] predicted value : " + laPredictedValue)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] upper limit : " + laUpperLimit)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] lower limit : " + laLowerLimit)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-up action : " + scaleUp)) - eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-down action : " + scaleDown)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " scale-up action : " + scaleUp)) + eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " scale-down action : " + scaleDown)) then if (scaleUp) { int requiredReplicas = 0; if (scaleUpForRif) { int predictedReplicasForRif = $delegator.getPredictedReplicasForStat(minReplicas, rifUpperLimit, rifPredictedValue); - log.info("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] predicted replicas : " + predictedReplicasForRif); + log.info("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] predicted replicas : " + predictedReplicasForRif); if (predictedReplicasForRif > requiredReplicas ) { requiredReplicas = predictedReplicasForRif; } } if (scaleUpForMc) { int predictedReplicasForMc = $delegator.getPredictedReplicasForStat(minReplicas, mcUpperLimit, mcPredictedValue); - log.info("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] predicted replicas : " + predictedReplicasForMc); + log.info("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] predicted replicas : " + predictedReplicasForMc); if (predictedReplicasForMc > requiredReplicas ) { requiredReplicas = predictedReplicasForMc; } } if (scaleUpForLa) { int predictedReplicasForLa = $delegator.getPredictedReplicasForStat(minReplicas, laUpperLimit, laPredictedValue); - log.info("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] predicted replicas : " + predictedReplicasForLa); + log.info("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] predicted replicas : " + predictedReplicasForLa); if (predictedReplicasForLa > requiredReplicas ) { requiredReplicas = predictedReplicasForLa; } } //max-check if (requiredReplicas > maxReplicas) { - log.info("[scaling] " + " [cluster] " + clusterId + " predicted replicas > max replicas : "); + log.info("[scaling] " + " [cluster] : " + clusterId + " predicted replicas > max replicas : "); requiredReplicas = maxReplicas; } //min-check if (requiredReplicas < minReplicas) { - log.info("[scaling] " + " [cluster] " + clusterId + " predicted replicas < min replicas : "); + log.info("[scaling] " + " [cluster] : " + clusterId + " predicted replicas < min replicas : "); requiredReplicas = minReplicas; } //expand the cluster if (requiredReplicas > nonTerminatedReplicas) { - log.info("[scaling] Decided to scale-up : [cluster] " + clusterId); - log.info("[scaling-up] " + " [cluster] " + clusterId + " valid number of replicas to expand : " + requiredReplicas); + log.info("[scaling] Decided to scale-up : [cluster] : " + clusterId); + log.info("[scaling-up] " + " [cluster] : " + clusterId + " valid number of replicas to expand : " + requiredReplicas); $delegator.delegateUpdateContainers($kubernetesClusterContext, requiredReplicas); } //shrink the cluster if (requiredReplicas < nonTerminatedReplicas) { - log.info("[scaling] Decided to scale-down : [cluster] " + clusterId); - log.info("[scaling-down] " + " [cluster] " + clusterId + " valid number of replicas to shrink : " + requiredReplicas); + log.info("[scaling] Decided to scale-down : [cluster] : " + clusterId); + log.info("[scaling-down] " + " [cluster] : " + clusterId + " valid number of replicas to shrink : " + requiredReplicas); $delegator.delegateUpdateContainers($kubernetesClusterContext, requiredReplicas); } if (requiredReplicas == nonTerminatedReplicas) { - log.info("[scaling] " + " [cluster] " + clusterId + "non terminated replicas and predicted replicas are same"); + log.info("[scaling] " + " [cluster] : " + clusterId + "non terminated replicas and predicted replicas are same"); } } else if (scaleDown) { - log.info("[scaling] Decided to scale-down : [cluster] " + clusterId); - log.info("[scaling-down] " + " [cluster] " + clusterId + " shrink the cluster to minReplicas : " + minReplicas); + log.info("[scaling] Decided to scale-down [cluster] : " + clusterId); + log.info("[scaling-down] " + " [cluster] : " + clusterId + " shrink the cluster to minReplicas : " + minReplicas); //shrink the cluster to minReplicas $delegator.delegateUpdateContainers($kubernetesClusterContext, minReplicas); } else {
