This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 99adc5f  Fix error partition blocks load rebalance (#1867)
99adc5f is described below

commit 99adc5fbfd6f887229fb70e60b31c03f189b7704
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Sep 13 19:43:24 2021 -0700

    Fix error partition blocks load rebalance (#1867)
    
    There are three things fixed:
    1. State priority is higher priority with smaller number.
    2. When only downward is allowed, any non downward STs must be removed from 
message and throttled.
    3. Even for downward STs should be respect to the throttling as backward 
compatible behavior.
    4. Fix test for TestErrorReplicaPersist
---
 .../stages/IntermediateStateCalcStage.java         |  11 +-
 .../stages/TestReplicaLevelThrottling.java         |  14 ++-
 .../helix/integration/TestErrorReplicaPersist.java |   7 ++
 .../TestReplicaLevelThrottling.SingleTopState.json | 115 +++++++++++++++++++++
 4 files changed, 141 insertions(+), 6 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 1d2ed02..558a61f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -437,7 +437,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     // If the state is not found in statePriorityMap, consider it not strictly 
downward by
     // default because we can't determine whether it is downward
     if (statePriorityMap.containsKey(message.getFromState()) && 
statePriorityMap.containsKey(message.getToState())
-        && statePriorityMap.get(message.getFromState()) > 
statePriorityMap.get(message.getToState())) {
+        && statePriorityMap.get(message.getFromState()) < 
statePriorityMap.get(message.getToState())) {
       return true;
     }
     return false;
@@ -520,8 +520,13 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
        ResourceControllerDataProvider cache,
       boolean onlyDownwardLoadBalance, StateModelDefinition 
stateModelDefinition, Set<String> messagesThrottled,
       Map<Partition, List<Message>> resourceMessageMap) {
-    if (onlyDownwardLoadBalance && 
isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) {
-      // Remove the message already allowed for downward state transitions.
+    // TODO: refactor the logic into throttling to let throttling logic to 
handle only downward including recovery rebalance
+    // If only downward allowed: 1) any non-downward ST messages will be 
throttled and removed.
+    //                           2) any downward ST messages will respect the 
throttling.
+    // If not only downward allowed, all ST messages should respect the 
throttling.
+    if (onlyDownwardLoadBalance && 
!isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) 
{
+      resourceMessageMap.get(partition).remove(messageToThrottle);
+      messagesThrottled.add(messageToThrottle.getId());
       return;
     }
     throttleStateTransitionsForReplica(throttleController, 
resource.getResourceName(), partition, messageToThrottle,
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
index 0c1a705..af47542 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -54,6 +54,7 @@ public class TestReplicaLevelThrottling extends BaseStageTest 
{
   static final String CLUSTER_NAME = "TestCluster";
   static final String RESOURCE_NAME = "TestResource";
   static final String NOT_SET = "-1";
+  static final String DEFAULT_ERROR_THRESHOLD = 
String.valueOf(Integer.MAX_VALUE);
 
   @Test(dataProvider = "replicaLevelThrottlingInput")
   public void testPerReplicaThrottling(ClusterEvent event, Map<String, 
Map<String, String>> expectedOutput,
@@ -128,11 +129,17 @@ public class TestReplicaLevelThrottling extends 
BaseStageTest {
     instanceThrottleRecovery,
     currentStates,
     pendingMessages,
-    expectedOutput
+    expectedOutput,
+    errorThreshold
   }
 
   enum CacheKeys {
-    clusterConfig, stateModelName, stateModelDef, minActiveReplica, 
numReplica, preferenceList
+    clusterConfig,
+    stateModelName,
+    stateModelDef,
+    minActiveReplica,
+    numReplica,
+    preferenceList
   }
 
   public List<Object[]> loadTestInputs(String fileName) {
@@ -206,8 +213,9 @@ public class TestReplicaLevelThrottling extends 
BaseStageTest {
         
getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
             StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 
Entry.instanceThrottleRecovery.name(),
             throttleConfigs, inMap);
-
         clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+        
clusterConfig.setErrorPartitionThresholdForLoadBalance(Integer.parseInt(
+            (String) inMap.getOrDefault(Entry.errorThreshold.name(), 
DEFAULT_ERROR_THRESHOLD)));
 
         Map<String, Object> cacheMap = new HashMap<>();
         cacheMap.put(CacheKeys.clusterConfig.name(), clusterConfig);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
index ff47a71..5c7fb86 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
@@ -29,6 +30,7 @@ import 
org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.rebalancer.TestAutoRebalance;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.StateMachineEngine;
@@ -97,6 +99,11 @@ public class TestErrorReplicaPersist extends 
ZkStandAloneCMTestBase {
 
   @Test
   public void testErrorReplicaPersist() throws InterruptedException {
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setErrorPartitionThresholdForLoadBalance(100000);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    
     for (int i = 0; i < (NODE_NR + 1) / 2; i++) {
       _participants[i].syncStop();
       Thread.sleep(2000);
diff --git 
a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json 
b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
index 9706d16..5d4cdaf 100644
--- 
a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
+++ 
b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
@@ -275,6 +275,121 @@
           "localhost_12915": "OFFLINE"
         }
       }
+    },
+    {
+      "description": "Set error threshold, only downward allowed",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "-1",
+      "instanceThrottleLoad": "10",
+      "instanceThrottleRecovery": "-1",
+      "errorThreshold" : "2",
+      "partitionNames": [
+        "partition_0",
+        "partition_1",
+        "partition_2",
+        "partition_3"
+      ],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12916": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_2": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_3": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "STANDBY",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY",
+          "localhost_12916": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_2": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_3": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_2": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_3": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "pendingMessages": {
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY",
+          "localhost_12916": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_2": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_3": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        }
+      }
     }
   ]
 }
\ No newline at end of file

Reply via email to