Repository: helix
Updated Branches:
  refs/heads/master 4c3ad2aec -> dd3be71c9


[HELIX-716] [HELIX] Make downward load balance also be subject to 
StateTransitionThrottleConfig

In the previous implementation of allowing downward transitions, downward 
transitions were not subject to any throttling constraints. In this change, 
downward load balance transitions are made subject to the throttling 
constraints.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dd3be71c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dd3be71c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dd3be71c

Branch: refs/heads/master
Commit: dd3be71c9423eea8283bda6819beb76edecf1fb2
Parents: 4c3ad2a
Author: Hunter Lee <[email protected]>
Authored: Mon Jul 9 13:14:24 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Mon Jul 9 16:42:14 2018 -0700

----------------------------------------------------------------------
 .../stages/IntermediateStateCalcStage.java      | 65 +++++++++++---------
 .../TestPartitionMovementThrottle.java          | 18 +++---
 2 files changed, 44 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/dd3be71c/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index e70e420..0f11ecd 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -45,6 +45,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     CurrentStateOutput currentStateOutput = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
+
     BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
@@ -53,8 +54,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     if (currentStateOutput == null || bestPossibleStateOutput == null || 
resourceMap == null
         || cache == null) {
       throw new StageException(String.format("Missing attributes in event: %s. 
"
-              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) 
|RESOURCES (%s) |DataCache (%s)", event,
-          currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
+              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) 
|RESOURCES (%s) |DataCache (%s)",
+          event, currentStateOutput, bestPossibleStateOutput, resourceMap, 
cache));
     }
 
     IntermediateStateOutput intermediateStateOutput =
@@ -320,7 +321,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     if (clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance() != 
-1) {
       // ErrorOrRecovery is set
       threshold = 
clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
-      partitionCount += partitionsNeedRecovery.size(); // Only add this count 
when the threshold is set
+      partitionCount += partitionsNeedRecovery.size(); // Only add this count 
when the threshold is
+      // set
     } else {
       if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
         // 0 is the default value so the old threshold has been set
@@ -328,30 +330,14 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       }
     }
 
-    // Perform load balance only if the number of partitions in recovery and 
in error is less than
-    // the threshold
-    if (partitionCount < threshold) {
-      loadbalanceThrottledPartitions = loadRebalance(resource, 
currentStateOutput,
-          bestPossiblePartitionStateMap, throttleController, 
intermediatePartitionStateMap,
-          partitionsNeedLoadBalance, 
currentStateOutput.getCurrentStateMap(resourceName));
-    } else {
-      // Only allow dropping of replicas to happen (dropping does NOT need to 
be throttled) and skip
-      // load balance for this cycle
-      for (Partition partition : partitionsNeedLoadBalance) {
-        Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-        Map<String, String> bestPossibleMap =
-            bestPossiblePartitionStateMap.getPartitionMap(partition);
-        // Skip load balance by passing current state to intermediate state
-        intermediatePartitionStateMap.setState(partition, currentStateMap);
+    // Perform regular load balance only if the number of partitions in 
recovery and in error is
+    // less than the threshold. Otherwise, only allow downward-transition load 
balance
+    boolean onlyDownwardLoadBalance = partitionCount >= threshold;
 
-        // Check if this partition only has downward state transitions; if so, 
allow state
-        // transitions by setting it at bestPossibleState
-        if (isLoadBalanceDownwardForAllReplicas(currentStateMap, 
bestPossibleMap, stateModelDef)) {
-          intermediatePartitionStateMap.setState(partition, bestPossibleMap);
-        }
-      }
-    }
+    loadbalanceThrottledPartitions = loadRebalance(resource, 
currentStateOutput,
+        bestPossiblePartitionStateMap, throttleController, 
intermediatePartitionStateMap,
+        partitionsNeedLoadBalance, 
currentStateOutput.getCurrentStateMap(resourceName),
+        onlyDownwardLoadBalance, stateModelDef);
 
     if (clusterStatusMonitor != null) {
       clusterStatusMonitor.updateRebalancerStats(resourceName, 
partitionsNeedRecovery.size(),
@@ -507,13 +493,16 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
    * @param intermediatePartitionStateMap
    * @param partitionsNeedLoadbalance
    * @param currentStateMap
-   * @return a set of partitions that need to be load-balanced but did not due 
to throttling
+   * @param onlyDownwardLoadBalance true when only allowing downward 
transitions
+   * @param stateModelDef for determining whether a partition's transitions 
are strictly downward
+   * @return
    */
   private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput 
currentStateOutput,
       PartitionStateMap bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedLoadbalance,
-      Map<Partition, Map<String, String>> currentStateMap) {
+      Map<Partition, Map<String, String>> currentStateMap, boolean 
onlyDownwardLoadBalance,
+      StateModelDefinition stateModelDef) {
     String resourceName = resource.getResourceName();
     Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
 
@@ -532,7 +521,23 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     });
     Collections.sort(partitionsNeedLoadRebalancePrioritized, new 
PartitionPriorityComparator(
         bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", 
false));
+
     for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
+      // If this is a downward load balance, check if the partition's 
transition is strictly
+      // downward
+      if (onlyDownwardLoadBalance) {
+        Map<String, String> currentStateMapForPartition =
+            currentStateOutput.getCurrentStateMap(resourceName, partition);
+        Map<String, String> bestPossibleMapForPartition =
+            bestPossiblePartitionStateMap.getPartitionMap(partition);
+        if (!isLoadBalanceDownwardForAllReplicas(currentStateMapForPartition,
+            bestPossibleMapForPartition, stateModelDef)) {
+          // For downward load balance, if a partition's transitions are not 
strictly downward,
+          // set currentState to intermediateState
+          intermediatePartitionStateMap.setState(partition, 
currentStateMapForPartition);
+          continue;
+        }
+      }
       throttleStateTransitionsForPartition(throttleController, resourceName, 
partition,
           currentStateOutput, bestPossiblePartitionStateMap, 
partitionsLoadbalanceThrottled,
           intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE);
@@ -586,7 +591,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
             hasReachedThrottlingLimit = true;
             if (logger.isDebugEnabled()) {
               logger.debug(
-                  "Throttled because of instance: {} for partition: {} in 
resource: {}" + instance,
+                  "Throttled because of instance: {} for partition: {} in 
resource: {}", instance,
                   partition.getPartitionName(), resourceName);
             }
             break;
@@ -834,4 +839,4 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       return matchedState;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/dd3be71c/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
index d37f9cf..49f110e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
@@ -109,17 +109,17 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
             StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100);
 
 
-        StateTransitionThrottleConfig resourceRecoveryThrottle = new 
StateTransitionThrottleConfig(
-            StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
-            StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 3);
+    StateTransitionThrottleConfig resourceRecoveryThrottle = new 
StateTransitionThrottleConfig(
+        StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+        StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 3);
 
-        StateTransitionThrottleConfig clusterRecoveryThrottle = new 
StateTransitionThrottleConfig(
-            StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
-            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100);
+    StateTransitionThrottleConfig clusterRecoveryThrottle = new 
StateTransitionThrottleConfig(
+        StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100);
 
     clusterConfig.setStateTransitionThrottleConfigs(Arrays
         .asList(resourceLoadThrottle, instanceLoadThrottle, 
clusterLoadThrottle,
-    resourceRecoveryThrottle, clusterRecoveryThrottle));
+            resourceRecoveryThrottle, clusterRecoveryThrottle));
 
 
     clusterConfig.setPersistIntermediateAssignment(true);
@@ -362,7 +362,6 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
         "Throttle condition does not meet for " + throttledItemName);
   }
 
-
   private int size(List<PartitionTransitionTime> timeList) {
     Set<String> partitions = new HashSet<String>();
     for (PartitionTransitionTime p : timeList) {
@@ -382,7 +381,8 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
       this.end = end;
     }
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
       return "[" +
           "partition='" + partition + '\'' +
           ", start=" + start +

Reply via email to