improvements to autoscaling
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/5d684714 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/5d684714 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/5d684714 Branch: refs/heads/docker-grouping-merge Commit: 5d6847141d84bd0645c2bc8129b6248517391a40 Parents: 4e7e7bb Author: Asiri LIyana Arachchi <[email protected]> Authored: Tue Nov 4 23:27:41 2014 +0530 Committer: Asiri LIyana Arachchi <[email protected]> Committed: Tue Nov 4 23:27:41 2014 +0530 ---------------------------------------------------------------------- .../autoscaler/NetworkPartitionContext.java | 19 +++++ .../AutoscalerHealthStatEventReceiver.java | 28 ++++++- .../monitor/cluster/AbstractClusterMonitor.java | 1 + .../monitor/cluster/VMClusterMonitor.java | 50 ++++++++---- .../monitor/cluster/VMLbClusterMonitor.java | 1 + .../cluster/VMServiceClusterMonitor.java | 2 + .../autoscaler/rule/RuleTasksDelegator.java | 86 ++++++++++++++++++++ .../LoadBalancerStatisticsReader.java | 5 ++ .../LoadBalancerStatisticsNotifier.java | 11 ++- .../WSO2CEPInFlightRequestPublisher.java | 7 +- .../LoadBalancerStatisticsCollector.java | 38 +++++++++ .../stat/AverageRequestsInFlightEvent.java | 11 ++- .../stat/HealthStatMessageProcessorChain.java | 5 ++ .../AverageInFlightRequestsEventFormatter.xml | 2 +- .../AverageInFlightRequestsFinder.xml | 15 +++- .../streamdefinitions/stream-manager-config.xml | 20 ++++- .../extension/HAProxyStatisticsReader.java | 10 +++ .../src/main/conf/drools/scaling.drl | 25 +++++- 18 files changed, 307 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java index 3daf2c1..c2b3bdf 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java @@ -41,6 +41,8 @@ public class NetworkPartitionContext implements Serializable{ private final String id; private int scaleDownWaitCount = 5; //TODO get from a config private int scaleDownRequestsCount = 0; + private float averageRequestsServedPerInstance; + private float requestsServedPerInstance; // private String defaultLbClusterId; // @@ -58,6 +60,8 @@ public class NetworkPartitionContext implements Serializable{ //boolean values to keep whether the load average parameters are reset or not private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false; + //boolean values to keep whether average requests served per instance parameters are reset or not + private boolean averageRequestServedPerInstanceReset= false; //FIXME this should be populated via PartitionGroups a.k.a. NetworkPartitions private int minInstanceCount = 1, maxInstanceCount = 1; @@ -241,6 +245,21 @@ public class NetworkPartitionContext implements Serializable{ this.currentPartitionIndex = currentPartitionIndex; } + public float getAverageRequestsServedPerInstance() { return averageRequestsServedPerInstance;} + + public void setAverageRequestsServedPerInstance(float averageRequestServedPerInstance) { + this.averageRequestsServedPerInstance = averageRequestServedPerInstance; + averageRequestServedPerInstanceReset = true; + + if(log.isDebugEnabled()){ + log.debug(String.format("Average Requesets Served Per Instance stats are reset, ready to do scale check [network partition] %s" + , this.id)); + + } + } + + public float getRequestsServedPerInstance() { return requestsServedPerInstance;} + public float getAverageRequestsInFlight() { return requestsInFlight.getAverage(); } http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java index 718cc16..e2eca17 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; @@ -41,9 +42,11 @@ import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLo import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent; import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent; import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsServingCapabilityEvent; import org.apache.stratos.messaging.listener.health.stat.AverageLoadAverageEventListener; import org.apache.stratos.messaging.listener.health.stat.AverageMemoryConsumptionEventListener; import org.apache.stratos.messaging.listener.health.stat.AverageRequestsInFlightEventListener; +import org.apache.stratos.messaging.listener.health.stat.AverageRequestsServingCapabilityEventListener; import org.apache.stratos.messaging.listener.health.stat.GradientOfLoadAverageEventListener; import org.apache.stratos.messaging.listener.health.stat.GradientOfMemoryConsumptionEventListener; import org.apache.stratos.messaging.listener.health.stat.GradientOfRequestsInFlightEventListener; @@ -122,6 +125,7 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { } }); + healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() { @Override protected void onEvent(org.apache.stratos.messaging.event.Event event) { @@ -152,7 +156,7 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } @@ -160,6 +164,28 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { } }); + healthStatEventReceiver.addEventListener(new AverageRequestsServingCapabilityEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent = (AverageRequestsServingCapabilityEvent) event; + String clusterId = averageRequestsServingCapabilityEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + if(monitor instanceof VMClusterMonitor) { + VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor; + vmClusterMonitor.handleAverageRequestsServingCapabilityEvent(averageRequestsServingCapabilityEvent); + } + } + }); + healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() { @Override protected void onEvent(org.apache.stratos.messaging.event.Event event) { http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java index fa9736e..55748e1 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java @@ -36,6 +36,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsServingCapabilityEvent; import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent; import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent; import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent; http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java index 0990ead..d6dc27c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.MemberStatsContext; import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; @@ -37,22 +38,7 @@ import org.apache.stratos.cloud.controller.stub.pojo.Properties; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; -import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent; -import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; -import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.*; import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; @@ -193,7 +179,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { } else { if (log.isDebugEnabled()) { log.debug(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); + " [network partition] %s", networkPartitionId)); } } } @@ -220,12 +206,42 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { } } + public void handleAverageRequestsServingCapabilityEvent(AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent) { + + String clusterId = averageRequestsServingCapabilityEvent.getClusterId(); + String networkPartitionId = averageRequestsServingCapabilityEvent.getNetworkPartitionId(); + Float floatValue = averageRequestsServingCapabilityEvent.getValue(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, floatValue)); + } + + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setAverageRequestsServedPerInstance(floatValue); + + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + + } + @Override public void handleAverageRequestsInFlightEvent( AverageRequestsInFlightEvent averageRequestsInFlightEvent) { String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId(); String clusterId = averageRequestsInFlightEvent.getClusterId(); + Float servedCount = averageRequestsInFlightEvent.getServedCount(); + Float activeInstances = averageRequestsInFlightEvent.getActiveInstances(); + Float requestsServedPerInstance = servedCount/activeInstances; + if(requestsServedPerInstance.isInfinite()){ + requestsServedPerInstance = 0f; + } float value = averageRequestsInFlightEvent.getValue(); if (log.isDebugEnabled()) { log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java index b3dad4e..386197c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java @@ -36,6 +36,7 @@ import org.apache.stratos.autoscaler.util.AutoScalerConstants; import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsServingCapabilityEvent; import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; /** http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java index 0b460ab..e5e8fe4 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy; @@ -41,6 +42,7 @@ import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsServingCapabilityEvent; /** * Is responsible for monitoring a service cluster. This runs periodically http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index c0f4933..8ab5308 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -36,6 +36,11 @@ import org.apache.stratos.autoscaler.exception.SpawningException; import org.apache.stratos.autoscaler.exception.TerminationException; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.MemberStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; /** * This will have utility methods that need to be executed from rule file... @@ -44,6 +49,7 @@ public class RuleTasksDelegator { public static final double SCALE_UP_FACTOR = 0.8; //get from config public static final double SCALE_DOWN_FACTOR = 0.2; + private static boolean arspiIsSet = false; private static final Log log = LogFactory.getLog(RuleTasksDelegator.class); @@ -59,6 +65,86 @@ public class RuleTasksDelegator { return predictedValue; } + + public int getNumberOfInstancesRequiredBasedOnRif(float rifPredictedValue , float requestsServedPerInstance , float averageRequestsServedPerInstance , boolean arspiReset){ + + float requestsInstanceCanHandle = requestsServedPerInstance; + + if(arspiReset && averageRequestsServedPerInstance != 0){ + requestsInstanceCanHandle = averageRequestsServedPerInstance; + + } + float numberOfInstances = 0; + if(requestsInstanceCanHandle!=0) { + numberOfInstances = rifPredictedValue / requestsInstanceCanHandle; + arspiReset = true; + + }else{ + arspiReset = false; + } + return (int)Math.ceil(numberOfInstances); + } + + public int getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(float upperLimit , float lowerLimit ,double predictedValue , int activeMemberCount ){ + + double numberOfInstances = 0; + if(predictedValue > upperLimit){ + numberOfInstances = (activeMemberCount*predictedValue)/upperLimit; + }else if((upperLimit >= predictedValue) && (predictedValue >= lowerLimit)){ + numberOfInstances = activeMemberCount; + }else{ + numberOfInstances = (activeMemberCount*predictedValue)/lowerLimit; + } + + return (int)Math.ceil(numberOfInstances); + } + + public int getMaxNumberOfInstancesRequired(int numberOfInstancesReuquiredBasedOnRif , int numberOfInstancesReuquiredBasedOnMemoryConsumption , boolean mcReset , int numberOfInstancesReuquiredBasedOnLoadAverage , boolean laReset){ + int numberOfInstances = 0; + + int rifBasedRequiredInstances = 0; + int mcBasedRequiredInstances = 0; + int laBasedRequiredInstances = 0; + if(arspiIsSet){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnRif; + } + if(mcReset){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnMemoryConsumption; + } + if(laReset){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnLoadAverage; + } + numberOfInstances = Math.max(Math.max(numberOfInstancesReuquiredBasedOnMemoryConsumption,numberOfInstancesReuquiredBasedOnLoadAverage),numberOfInstancesReuquiredBasedOnRif); + return numberOfInstances; + } + + public int getMemberCount(String clusterId , int scalingPara ){ + + int activeMemberCount = 0; + int memberCount = 0; + for( Service service : TopologyManager.getTopology().getServices()) { + if(service.clusterExists(clusterId)) { + Cluster cluster = service.getCluster(clusterId); + + for (Member member : cluster.getMembers()) { + if (member.isActive() || member.getStatus() == MemberStatus.Created || member.getStatus() == MemberStatus.Starting ) { + memberCount++; + if(member.isActive()) { + activeMemberCount++; + } + } + } + } + } + if(scalingPara == 1){ + return memberCount; + }else{ + return activeMemberCount; + } + + + } + public AutoscaleAlgorithm getAutoscaleAlgorithm(String partitionAlgorithm){ AutoscaleAlgorithm autoscaleAlgorithm = null; if(log.isDebugEnabled()){ http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java index 4a83aee..79386bd 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java @@ -19,6 +19,8 @@ package org.apache.stratos.load.balancer.common.statistics; +import org.apache.stratos.messaging.domain.topology.Cluster; + /** * Load balancer statistics reader interface. */ @@ -29,4 +31,7 @@ public interface LoadBalancerStatisticsReader { * @param clusterId */ int getInFlightRequestCount(String clusterId); + int getServedRequestCount(String clusterId); + int getActiveInstancesCount(Cluster cluster); + } http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index 4fe2504..7d123e5 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -74,12 +74,21 @@ public class LoadBalancerStatisticsNotifier implements Runnable { try { TopologyManager.acquireReadLock(); int requestCount; + int servedRequestCount; + int activeInstancesCount; for (Service service : TopologyManager.getTopology().getServices()) { for (Cluster cluster : service.getClusters()) { if (!cluster.isLbCluster()) { // Publish in-flight request count of load balancer's network partition requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); - inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); + servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId()); + if(requestCount == 0) { + servedRequestCount = 0; + } + activeInstancesCount = statsReader.getActiveInstancesCount(cluster); + inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId,activeInstancesCount, requestCount, servedRequestCount); + log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ", + cluster.getClusterId(), networkPartitionId, servedRequestCount , activeInstancesCount ,requestCount )); if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", cluster.getClusterId(), networkPartitionId, requestCount)); http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java index 519a687..24d5257 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java @@ -47,7 +47,9 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher // Payload definition payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); + payloadData.add(new Attribute("active_instances_count", AttributeType.DOUBLE)); payloadData.add(new Attribute("in_flight_request_count", AttributeType.DOUBLE)); + payloadData.add(new Attribute("served_request_count", AttributeType.DOUBLE)); streamDefinition.setPayloadData(payloadData); return streamDefinition; } catch (Exception e) { @@ -65,13 +67,16 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher * @param clusterId * @param networkPartitionId * @param inFlightRequestCount + * @param servedRequestCount */ - public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) { + public void publish(String clusterId, String networkPartitionId,int activeInstancesCount, int inFlightRequestCount, int servedRequestCount) { List<Object> payload = new ArrayList<Object>(); // Payload values payload.add(clusterId); payload.add(networkPartitionId); + payload.add((double)activeInstancesCount); payload.add((double)inFlightRequestCount); + payload.add((double)servedRequestCount); super.publish(payload.toArray()); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index 3557d3a..10e38f9 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -22,6 +22,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -35,9 +37,11 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe private static volatile LoadBalancerStatisticsCollector instance; // Map<ClusterId, Integer> private Map<String, Integer> clusterIdRequestCountMap; + private Map<String, Integer> clusterIdServedRequestCountMap; private LoadBalancerStatisticsCollector() { clusterIdRequestCountMap = new ConcurrentHashMap<String, Integer>(); + clusterIdServedRequestCountMap = new ConcurrentHashMap<String, Integer>(); } public static LoadBalancerStatisticsCollector getInstance() { @@ -75,6 +79,33 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe } } + /** + * Returns the number of requests served since the last time this function was called. + */ + public int getServedRequestCount(String clusterId){ + synchronized (LoadBalancerStatisticsCollector.class) { + if (clusterIdServedRequestCountMap.containsKey(clusterId)) { + Integer servedCount = clusterIdServedRequestCountMap.get(clusterId); + if (servedCount != null) { + clusterIdServedRequestCountMap.put(clusterId, 0); + return servedCount; + } + } + return 0; + } + } + + public int getActiveInstancesCount(Cluster cluster) { + int activeInstances = 0; + for( Member member :cluster.getMembers()){ + if(member.isActive()){ + activeInstances++; + } + + }return activeInstances; + + } + void incrementInFlightRequestCount(String clusterId) { synchronized (LoadBalancerStatisticsCollector.class) { if (StringUtils.isBlank(clusterId)) { @@ -118,6 +149,13 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe } clusterIdRequestCountMap.put(clusterId, count); + Integer servedCount = 0; + if (clusterIdServedRequestCountMap.containsKey(clusterId)) { + servedCount = clusterIdServedRequestCountMap.get(clusterId); + } + servedCount++; + clusterIdServedRequestCountMap.put(clusterId, servedCount); + if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count decremented: [cluster] %s [count] %s ", clusterId, count)); http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java index 143ae03..0e10af9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java @@ -28,11 +28,15 @@ public class AverageRequestsInFlightEvent extends Event { private final String networkPartitionId; private final String clusterId; private final float value; + private final float servedCount; + private final float activeInstances; - public AverageRequestsInFlightEvent(String networkPartitionId, String clusterId, float value) { + public AverageRequestsInFlightEvent(String networkPartitionId, String clusterId,float activeInstances, float value, float servedCount ) { this.networkPartitionId = networkPartitionId; this.clusterId = clusterId; this.value = value; + this.servedCount = servedCount; + this.activeInstances = activeInstances; } @@ -47,4 +51,9 @@ public class AverageRequestsInFlightEvent extends Event { public String getNetworkPartitionId() { return networkPartitionId; } + + public float getServedCount() { return servedCount;} + + public float getActiveInstances() { return activeInstances;} + } http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java index f9861f6..4da35a9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java @@ -44,6 +44,7 @@ public class HealthStatMessageProcessorChain extends MessageProcessorChain { private MemberGradientOfMemoryConsumptionMessageProcessor memberGradientOfMemoryConsumptionMessageProcessor; private MemberSecondDerivativeOfLoadAverageMessageProcessor memberSecondDerivativeOfLoadAverageMessageProcessor; private MemberSecondDerivativeOfMemoryConsumptionMessageProcessor memberSecondDerivativeOfMemoryConsumptionMessageProcessor; + private AverageRequestsServingCapabilityMessageProcessor averageRequestsServingCapabilityMessageProcessor; private MemberFaultMessageProcessor memberFaultMessageProcessor; @@ -66,6 +67,8 @@ public class HealthStatMessageProcessorChain extends MessageProcessorChain { averageRequestsInFlightMessageProcessor = new AverageRequestsInFlightMessageProcessor(); add(averageRequestsInFlightMessageProcessor); + averageRequestsServingCapabilityMessageProcessor = new AverageRequestsServingCapabilityMessageProcessor(); + add(averageRequestsServingCapabilityMessageProcessor); gradientOfRequestsInFlightMessageProcessor = new GradientOfRequestsInFlightMessageProcessor(); add(gradientOfRequestsInFlightMessageProcessor); secondDerivativeOfRequestsInFlightMessageProcessor = new SecondDerivativeOfRequestsInFlightMessageProcessor(); @@ -97,6 +100,8 @@ public class HealthStatMessageProcessorChain extends MessageProcessorChain { averageMemoryConsumptionMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof AverageRequestsInFlightEventListener) { averageRequestsInFlightMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AverageRequestsServingCapabilityEventListener) { + averageRequestsServingCapabilityMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof GradientOfLoadAverageEventListener) { gradientOfLoadAverageMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof GradientOfMemoryConsumptionEventListener) { http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml index 6e6ba96..84e95ae 100644 --- a/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml +++ b/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml @@ -24,7 +24,7 @@ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter"> <from streamName="average_in_flight_requests" version="1.0.0"/> <mapping customMapping="enable" type="json"> - <inline>{"org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","value":"{{count}}"}}}</inline> + <inline>{"org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","activeInstances":"{{instances_count}}","value":"{{count}}" ,"servedCount":"{{served_count}}"}}}</inline> </mapping> <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms"> <property name="transport.jms.Destination">summarized-health-stats</property> http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml b/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml index 6826dab..a8e890f 100644 --- a/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml +++ b/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml @@ -32,16 +32,25 @@ </importedStreams> <queryExpressions><![CDATA[ from avg_rif_stat - select cluster_id, network_partition_id, in_flight_request_count, + select cluster_id, network_partition_id,active_instances_count, in_flight_request_count,served_request_count, stratos:concat(cluster_id, '-' , network_partition_id) as avg_rif_cluster_network insert into avg_rif_concat; define partition avg_rif_cluster_partition by avg_rif_concat.avg_rif_cluster_network; from avg_rif_concat#window.timeBatch(1 min) - select cluster_id,network_partition_id, avg(in_flight_request_count) as count + select cluster_id,network_partition_id,avg(active_instances_count) as instances_count, avg(in_flight_request_count) as count , sum(served_request_count) as served_count insert into average_in_flight_requests - partition by avg_rif_cluster_partition;]]></queryExpressions> + partition by avg_rif_cluster_partition; + from average_in_flight_requests + select cluster_id, network_partition_id,instances_count, served_count, + stratos:divider(served_count , instances_count) as requests_per_instance + insert into served_requests_per_instance; + from served_requests_per_instance[requests_per_instance>0]#window.timeBatch(10 min) + select cluster_id,network_partition_id, avg(requests_per_instance) as average_served_count + insert into average_served_request_count;]]></queryExpressions> <exportedStreams> <stream name="average_in_flight_requests" valueOf="average_in_flight_requests" version="1.0.0"/> + <stream name="average_served_request_count" + valueOf="average_served_request_count" version="1.0.0"/> </exportedStreams> </executionPlan> http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml index 64b5d85..a9472ef 100644 --- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml +++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml @@ -32,7 +32,9 @@ <payloadData> <property name="cluster_id" type="String"/> <property name="network_partition_id" type="String"/> + <property name="active_instances_count" type="double"/> <property name="in_flight_request_count" type="double"/> + <property name="served_request_count" type="double"/> </payloadData> </streamDefinition> @@ -60,9 +62,25 @@ <payloadData> <property name="cluster_id" type="String"/> <property name="network_partition_id" type="String"/> + <property name="instances_count" type="double"/> <property name="count" type="double"/> + <property name="served_count" type="double"/> </payloadData> - </streamDefinition> + </streamDefinition> + + <streamDefinition name="average_served_request_count" version="1.0.0"> + <description>average served count per instance</description> + <nickName>average in-flight requests</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="average_served_count" type="double"/> + </payloadData> + </streamDefinition> <streamDefinition name="second_derivative_in_flight_requests" version="1.0.0"> <description>second derivative of in-flight request count</description> http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java index f29e1c6..5dec4d3 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java @@ -96,4 +96,14 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader { } return 0; } + + @Override + public int getServedRequestCount(String clusterId) { + return 0; + } + + @Override + public int getActiveInstancesCount(Cluster cluster) { + return 0; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/5d684714/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl index 3d3bbbf..1d1ba5d 100644 --- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl @@ -55,6 +55,7 @@ global java.lang.Boolean mcReset; global java.lang.Boolean laReset; global java.lang.Boolean isPrimary; global java.util.List primaryMembers; +global java.lang.Boolean arspiReset; rule "Scaling Rule" dialect "mvel" @@ -94,8 +95,22 @@ dialect "mvel" loadAverageSecondDerivative : Float() from $networkPartitionContext.getLoadAverageSecondDerivative() laPredictedValue : Double() from $delegator.getPredictedValueForNextMinute(loadAverageAverage, loadAverageGradient, loadAverageSecondDerivative, 1) - scaleUp : Boolean() from ((rifReset && (rifPredictedValue > rifUpperLimit)) || (mcReset && (mcPredictedValue > mcUpperLimit)) || (laReset && (laPredictedValue > laUpperLimit))) - scaleDown : Boolean() from ((rifReset && (rifPredictedValue < rifLowerLimit )) && (mcReset && (mcPredictedValue < mcLowerLimit)) && (laReset && (laPredictedValue < laLowerLimit))) + activeInstancesCount : Integer() from $delegator.getMemberCount(clusterId , 0) + instancesCount : Integer() from $delegator.getMemberCount(clusterId , 1) + + requestsServedPerInstance : Float() from $networkPartitionContext.getRequestsServedPerInstance() + averageRequestsServedPerInstance : Float() from $networkPartitionContext.getAverageRequestsServedPerInstance() + + numberOfInstancesReuquiredBasedOnRif : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnRif(rifPredictedValue, requestsServedPerInstance, averageRequestsServedPerInstance, arspiReset) + numberOfInstancesReuquiredBasedOnMemoryConsumption : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(mcUpperLimit , mcLowerLimit, mcPredictedValue ,activeInstancesCount ) + numberOfInstancesReuquiredBasedOnLoadAverage : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(laUpperLimit , laLowerLimit, laPredictedValue ,activeInstancesCount ) + + numberOfRequiredInstances : Integer() from $delegator.getMaxNumberOfInstancesRequired(numberOfInstancesReuquiredBasedOnRif, numberOfInstancesReuquiredBasedOnMemoryConsumption ,mcReset ,numberOfInstancesReuquiredBasedOnLoadAverage, laReset) + + + + scaleUp : Boolean() from (instancesCount < numberOfRequiredInstances ) + scaleDown : Boolean() from (activeInstancesCount > numberOfRequiredInstances ) eval(log.debug("[scaling] " + " [cluster] " + clusterId + " RIF Resetted?: " + rifReset)) eval(log.debug("[scaling] " + " [cluster] " + clusterId + " RIF predicted value: " + rifPredictedValue)) @@ -116,19 +131,23 @@ dialect "mvel" then if(scaleUp){ + int additionalInstances = numberOfRequiredInstances - instancesCount ; $networkPartitionContext.resetScaleDownRequestsCount(); + int count=0; + while(count != additionalInstances){ Partition partition = autoscaleAlgorithm.getNextScaleUpPartition($networkPartitionContext, clusterId); if(partition != null){ log.info("[scale-up] Partition available, hence trying to spawn an instance to scale up!" ); log.debug("[scale-up] " + " [partition] " + partition.getId() + " [cluster] " + clusterId ); $delegator.delegateSpawn($networkPartitionContext.getPartitionCtxt(partition.getId()), clusterId, lbRef, isPrimary); + count++; } + } } else if(scaleDown){ log.debug("[scale-down] Decided to Scale down [cluster] " + clusterId); if($networkPartitionContext.getScaleDownRequestsCount() > 5 ){ log.debug("[scale-down] Reached scale down requests threshold [cluster] " + clusterId + " Count " + $networkPartitionContext.getScaleDownRequestsCount()); - $networkPartitionContext.resetScaleDownRequestsCount(); MemberStatsContext selectedMemberStatsContext = null; double lowestOverallLoad = 0.0; boolean foundAValue = false;
