Updated Branches: refs/heads/master 0eb46fd18 -> 07e04592b
improving scaling rule to get decesions based on load average and memory consumption Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ed16b3a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ed16b3a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ed16b3a3 Branch: refs/heads/master Commit: ed16b3a384e87b07ced49310b8e61836b3e80901 Parents: 0225801 Author: Lahiru Sandaruwan <[email protected]> Authored: Thu Jan 2 19:00:11 2014 +0530 Committer: Lahiru Sandaruwan <[email protected]> Committed: Thu Jan 2 19:00:11 2014 +0530 ---------------------------------------------------------------------- .../autoscaler/NetworkPartitionContext.java | 162 +++++++++++++++++-- .../autoscaler/monitor/ClusterMonitor.java | 18 ++- .../distribution/src/main/conf/scaling.drl | 40 +++-- 3 files changed, 193 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ed16b3a3/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java index 37b58b3..72a2071 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java @@ -20,6 +20,9 @@ package org.apache.stratos.autoscaler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.policy.model.LoadAverage; +import org.apache.stratos.autoscaler.policy.model.MemoryConsumption; +import org.apache.stratos.autoscaler.policy.model.RequestsInFlight; import org.apache.stratos.cloud.controller.deployment.partition.Partition; import java.io.Serializable; @@ -46,16 +49,22 @@ public class NetworkPartitionContext implements Serializable{ //boolean values to keep whether the requests in flight parameters are reset or not private boolean rifReset = false, averageRifReset = false, gradientRifReset = false, secondDerivativeRifRest = false; - + //boolean values to keep whether the memory consumption parameters are reset or not + private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false, + gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false; + //boolean values to keep whether the load average parameters are reset or not + private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, + secondDerivativeLoadAverageRest = false; + //FIXME this should be populated via PartitionGroups a.k.a. NetworkPartitions private int minInstanceCount = 1, maxInstanceCount = 1; - private Partition[] partitions; + private final Partition[] partitions; //Following information will keep events details - private float averageRequestsInFlight; - private float requestsInFlightSecondDerivative; - private float requestsInFlightGradient; + private RequestsInFlight requestsInFlight; + private MemoryConsumption memoryConsumption; + private LoadAverage loadAverage; //details required for partition selection algorithms private int currentPartitionIndex; @@ -74,6 +83,9 @@ public class NetworkPartitionContext implements Serializable{ this.setClusterIdToLBClusterIdMap(new HashMap<String, String>()); // partitionToMemberCountMap = new HashMap<String, Integer>(); partitionCtxts = new HashMap<String, PartitionContext>(); + requestsInFlight = new RequestsInFlight(); + loadAverage = new LoadAverage(); + memoryConsumption = new MemoryConsumption(); } @@ -224,11 +236,11 @@ public class NetworkPartitionContext implements Serializable{ } public float getAverageRequestsInFlight() { - return averageRequestsInFlight; + return requestsInFlight.getAverage(); } public void setAverageRequestsInFlight(float averageRequestsInFlight) { - this.averageRequestsInFlight = averageRequestsInFlight; + requestsInFlight.setAverage(averageRequestsInFlight); averageRifReset = true; if(secondDerivativeRifRest && gradientRifReset){ rifReset = true; @@ -240,11 +252,11 @@ public class NetworkPartitionContext implements Serializable{ } public float getRequestsInFlightSecondDerivative() { - return requestsInFlightSecondDerivative; + return requestsInFlight.getSecondDerivative(); } public void setRequestsInFlightSecondDerivative(float requestsInFlightSecondDerivative) { - this.requestsInFlightSecondDerivative = requestsInFlightSecondDerivative; + requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative); secondDerivativeRifRest = true; if(averageRifReset && gradientRifReset){ rifReset = true; @@ -256,11 +268,11 @@ public class NetworkPartitionContext implements Serializable{ } public float getRequestsInFlightGradient() { - return requestsInFlightGradient; + return requestsInFlight.getGradient(); } public void setRequestsInFlightGradient(float requestsInFlightGradient) { - this.requestsInFlightGradient = requestsInFlightGradient; + requestsInFlight.setGradient(requestsInFlightGradient); gradientRifReset = true; if(secondDerivativeRifRest && averageRifReset){ rifReset = true; @@ -282,6 +294,128 @@ public class NetworkPartitionContext implements Serializable{ this.secondDerivativeRifRest = rifReset; } + + public float getAverageMemoryConsumption() { + return memoryConsumption.getAverage(); + } + + public void setAverageMemoryConsumption(float averageMemoryConsumption) { + memoryConsumption.setAverage(averageMemoryConsumption); + averageMemoryConsumptionReset = true; + if(secondDerivativeMemoryConsumptionRest && gradientMemoryConsumptionReset){ + memoryConsumptionReset = true; + if(log.isDebugEnabled()){ + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getMemoryConsumptionSecondDerivative() { + return memoryConsumption.getSecondDerivative(); + } + + public void setMemoryConsumptionSecondDerivative(float memoryConsumptionSecondDerivative) { + memoryConsumption.setSecondDerivative(memoryConsumptionSecondDerivative); + secondDerivativeMemoryConsumptionRest = true; + if(averageMemoryConsumptionReset && gradientMemoryConsumptionReset){ + memoryConsumptionReset = true; + if(log.isDebugEnabled()){ + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getMemoryConsumptionGradient() { + return memoryConsumption.getGradient(); + } + + public void setMemoryConsumptionGradient(float memoryConsumptionGradient) { + memoryConsumption.setGradient(memoryConsumptionGradient); + gradientMemoryConsumptionReset = true; + if(secondDerivativeMemoryConsumptionRest && averageMemoryConsumptionReset){ + memoryConsumptionReset = true; + if(log.isDebugEnabled()){ + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isMemoryConsumptionReset() { + return memoryConsumptionReset; + } + + public void setMemoryConsumptionReset(boolean memoryConsumptionReset) { + this.memoryConsumptionReset = memoryConsumptionReset; + this.averageMemoryConsumptionReset = memoryConsumptionReset; + this.gradientMemoryConsumptionReset = memoryConsumptionReset; + this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset; + } + + + public float getAverageLoadAverage() { + return loadAverage.getAverage(); + } + + public void setAverageLoadAverage(float averageLoadAverage) { + loadAverage.setAverage(averageLoadAverage); + averageLoadAverageReset = true; + if(secondDerivativeLoadAverageRest && gradientLoadAverageReset){ + loadAverageReset = true; + if(log.isDebugEnabled()){ + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getLoadAverageSecondDerivative() { + return loadAverage.getSecondDerivative(); + } + + public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) { + loadAverage.setSecondDerivative(loadAverageSecondDerivative); + secondDerivativeLoadAverageRest = true; + if(averageLoadAverageReset && gradientLoadAverageReset){ + loadAverageReset = true; + if(log.isDebugEnabled()){ + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getLoadAverageGradient() { + return loadAverage.getGradient(); + } + + public void setLoadAverageGradient(float loadAverageGradient) { + loadAverage.setGradient(loadAverageGradient); + gradientLoadAverageReset = true; + if(secondDerivativeLoadAverageRest && averageLoadAverageReset){ + loadAverageReset = true; + if(log.isDebugEnabled()){ + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isLoadAverageReset() { + return loadAverageReset; + } + + public void setLoadAverageReset(boolean loadAverageReset) { + this.loadAverageReset = loadAverageReset; + this.averageLoadAverageReset = loadAverageReset; + this.gradientLoadAverageReset = loadAverageReset; + this.secondDerivativeLoadAverageRest = loadAverageReset; + } + + + public String getId() { return id; } @@ -343,12 +477,12 @@ public class NetworkPartitionContext implements Serializable{ return partitions; } - public void setPartitions(Partition[] partitions) { - this.partitions = partitions; +// public void setPartitions(Partition[] partitions) { +// this.partitions = partitions; // for (Partition partition: partitions){ // partitionToMemberCountMap.put(partition.getId(), 0); // } - } +// } // public void setPartitionToMemberCountMap(Map<String, Integer> partitionToMemberCountMap) { // this.partitionToMemberCountMap = partitionToMemberCountMap; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ed16b3a3/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java index 5518ba8..b39c8ca 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java @@ -18,8 +18,6 @@ */ package org.apache.stratos.autoscaler.monitor; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.NetworkPartitionContext; @@ -28,6 +26,8 @@ import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import java.util.concurrent.ConcurrentHashMap; + /** * Is responsible for monitoring a service cluster. This runs periodically * and perform minimum instance check and scaling check using the underlying @@ -95,10 +95,17 @@ public class ClusterMonitor extends AbstractMonitor{ } - if(networkPartitionContext.isRifReset()){ + boolean rifReset = networkPartitionContext.isRifReset(); + boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset(); + boolean loadAverageReset = networkPartitionContext.isLoadAverageReset(); + if(rifReset || memoryConsumptionReset || loadAverageReset){ + scaleCheckKnowledgeSession.setGlobal("clusterId", clusterId); //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy); scaleCheckKnowledgeSession.setGlobal("autoscalePolicy", autoscalePolicy); + scaleCheckKnowledgeSession.setGlobal("rif", rifReset); + scaleCheckKnowledgeSession.setGlobal("memoryConsumption", memoryConsumptionReset); + scaleCheckKnowledgeSession.setGlobal("loadAverage", loadAverageReset); if (log.isDebugEnabled()) { log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId())); @@ -106,9 +113,12 @@ public class ClusterMonitor extends AbstractMonitor{ scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(scaleCheckKnowledgeSession , scaleCheckFactHandle, networkPartitionContext); + networkPartitionContext.setRifReset(false); + networkPartitionContext.setMemoryConsumptionReset(false); + networkPartitionContext.setLoadAverageReset(false); } else if(log.isDebugEnabled()){ - log.debug(String.format("Scale will not run since the LB statistics have not received before this " + + log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " + "cycle for network partition %s", networkPartitionContext.getId()) ); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ed16b3a3/products/autoscaler/modules/distribution/src/main/conf/scaling.drl ---------------------------------------------------------------------- diff --git a/products/autoscaler/modules/distribution/src/main/conf/scaling.drl b/products/autoscaler/modules/distribution/src/main/conf/scaling.drl index 175ba88..d32215a 100644 --- a/products/autoscaler/modules/distribution/src/main/conf/scaling.drl +++ b/products/autoscaler/modules/distribution/src/main/conf/scaling.drl @@ -50,9 +50,12 @@ global org.apache.stratos.autoscaler.rule.RuleTasksDelegator $delegator; global org.apache.stratos.autoscaler.policy.model.AutoscalePolicy autoscalePolicy; global java.lang.String clusterId; global java.lang.String lbRef; +global java.lang.Boolean rifReset; +global java.lang.Boolean mcReset; +global java.lang.Boolean laReset; -rule "Scaler-up Rule" +rule "Scale-up Rule: Rif" dialect "mvel" when $networkPartitionContext : NetworkPartitionContext () @@ -69,16 +72,35 @@ dialect "mvel" eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " Partition is null: " + (partition == null))) eval(partition != null) - lbStatAverage : Float() from $networkPartitionContext.getAverageRequestsInFlight() - lbStatGradient : Float() from $networkPartitionContext.getRequestsInFlightGradient() - lbStatSecondDerivative : Float() from $networkPartitionContext.getRequestsInFlightSecondDerivative() - averageLimit : Float() from $loadThresholds.getRequestsInFlight().getAverage() + rifAverage : Float() from $networkPartitionContext.getAverageRequestsInFlight() + rifGradient : Float() from $networkPartitionContext.getRequestsInFlightGradient() + rifSecondDerivative : Float() from $networkPartitionContext.getRequestsInFlightSecondDerivative() + rifAverageLimit : Float() from $loadThresholds.getRequestsInFlight().getAverage() + rifPredictedValue : Double() from $delegator.getPredictedValueForNextMinute(rifAverage, rifGradient, rifSecondDerivative, 1) + + memoryConsumptionAverage : Float() from $networkPartitionContext.getAverageMemoryConsumption() + memoryConsumptionGradient : Float() from $networkPartitionContext.getMemoryConsumptionGradient() + memoryConsumptionSecondDerivative : Float() from $networkPartitionContext.getMemoryConsumptionSecondDerivative() + mcAverageLimit : Float() from $loadThresholds.getMemoryConsumption().getAverage() + mcPredictedValue : Double() from $delegator.getPredictedValueForNextMinute(memoryConsumptionAverage, memoryConsumptionGradient, memoryConsumptionSecondDerivative, 1) + + loadAverageAverage : Float() from $networkPartitionContext.getAverageLoadAverage() + loadAverageGradient : Float() from $networkPartitionContext.getLoadAverageGradient() + loadAverageSecondDerivative : Float() from $networkPartitionContext.getLoadAverageSecondDerivative() + laAverageLimit : Float() from $loadThresholds.getLoadAverage().getAverage() + laPredictedValue : Double() from $delegator.getPredictedValueForNextMinute(loadAverageAverage, loadAverageGradient, loadAverageSecondDerivative, 1) + + scaleUpAction : Boolean() from ((rifReset && (rifPredictedValue > rifAverageLimit * $delegator.SCALE_UP_FACTOR)) || (mcReset && (mcPredictedValue > mcAverageLimit * $delegator.SCALE_UP_FACTOR)) || (laReset && (laPredictedValue > laAverageLimit * $delegator.SCALE_UP_FACTOR))) + + eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " RIF predicted value: " + rifPredictedValue)) + eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " RIF average limit: " + rifAverageLimit)) + + eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " MC predicted value: " + mcPredictedValue)) + eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " MC average limit: " + mcAverageLimit)) - predictedValue : Double() from $delegator.getPredictedValueForNextMinute(lbStatAverage, lbStatGradient, lbStatSecondDerivative, 1) - scaleUpAction : Boolean() from (predictedValue > averageLimit * $delegator.SCALE_UP_FACTOR) + eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " LA redicted value: " + laPredictedValue)) + eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " LA Average limit: " + laAverageLimit)) - eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " Predicted value: " + predictedValue)) - eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " Average limit: " + averageLimit)) eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " Scale-up factor: " + $delegator.SCALE_UP_FACTOR)) eval(log.debug("[scale-up] [network-partition] " + $networkPartitionContext.getId() + " [partition] " + partition.getId() + " [cluster] " + clusterId + " Scale-up action: " + scaleUpAction))
