This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 99adc5f Fix error partition blocks load rebalance (#1867)
99adc5f is described below
commit 99adc5fbfd6f887229fb70e60b31c03f189b7704
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Sep 13 19:43:24 2021 -0700
Fix error partition blocks load rebalance (#1867)
There are three things fixed:
1. State priority is higher priority with smaller number.
2. When only downward is allowed, any non downward STs must be removed from
message and throttled.
3. Even for downward STs should be respect to the throttling as backward
compatible behavior.
4. Fix test for TestErrorReplicaPersist
---
.../stages/IntermediateStateCalcStage.java | 11 +-
.../stages/TestReplicaLevelThrottling.java | 14 ++-
.../helix/integration/TestErrorReplicaPersist.java | 7 ++
.../TestReplicaLevelThrottling.SingleTopState.json | 115 +++++++++++++++++++++
4 files changed, 141 insertions(+), 6 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 1d2ed02..558a61f 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -437,7 +437,7 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// If the state is not found in statePriorityMap, consider it not strictly
downward by
// default because we can't determine whether it is downward
if (statePriorityMap.containsKey(message.getFromState()) &&
statePriorityMap.containsKey(message.getToState())
- && statePriorityMap.get(message.getFromState()) >
statePriorityMap.get(message.getToState())) {
+ && statePriorityMap.get(message.getFromState()) <
statePriorityMap.get(message.getToState())) {
return true;
}
return false;
@@ -520,8 +520,13 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
ResourceControllerDataProvider cache,
boolean onlyDownwardLoadBalance, StateModelDefinition
stateModelDefinition, Set<String> messagesThrottled,
Map<Partition, List<Message>> resourceMessageMap) {
- if (onlyDownwardLoadBalance &&
isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) {
- // Remove the message already allowed for downward state transitions.
+ // TODO: refactor the logic into throttling to let throttling logic to
handle only downward including recovery rebalance
+ // If only downward allowed: 1) any non-downward ST messages will be
throttled and removed.
+ // 2) any downward ST messages will respect the
throttling.
+ // If not only downward allowed, all ST messages should respect the
throttling.
+ if (onlyDownwardLoadBalance &&
!isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition))
{
+ resourceMessageMap.get(partition).remove(messageToThrottle);
+ messagesThrottled.add(messageToThrottle.getId());
return;
}
throttleStateTransitionsForReplica(throttleController,
resource.getResourceName(), partition, messageToThrottle,
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
index 0c1a705..af47542 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -54,6 +54,7 @@ public class TestReplicaLevelThrottling extends BaseStageTest
{
static final String CLUSTER_NAME = "TestCluster";
static final String RESOURCE_NAME = "TestResource";
static final String NOT_SET = "-1";
+ static final String DEFAULT_ERROR_THRESHOLD =
String.valueOf(Integer.MAX_VALUE);
@Test(dataProvider = "replicaLevelThrottlingInput")
public void testPerReplicaThrottling(ClusterEvent event, Map<String,
Map<String, String>> expectedOutput,
@@ -128,11 +129,17 @@ public class TestReplicaLevelThrottling extends
BaseStageTest {
instanceThrottleRecovery,
currentStates,
pendingMessages,
- expectedOutput
+ expectedOutput,
+ errorThreshold
}
enum CacheKeys {
- clusterConfig, stateModelName, stateModelDef, minActiveReplica,
numReplica, preferenceList
+ clusterConfig,
+ stateModelName,
+ stateModelDef,
+ minActiveReplica,
+ numReplica,
+ preferenceList
}
public List<Object[]> loadTestInputs(String fileName) {
@@ -206,8 +213,9 @@ public class TestReplicaLevelThrottling extends
BaseStageTest {
getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.INSTANCE,
Entry.instanceThrottleRecovery.name(),
throttleConfigs, inMap);
-
clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+
clusterConfig.setErrorPartitionThresholdForLoadBalance(Integer.parseInt(
+ (String) inMap.getOrDefault(Entry.errorThreshold.name(),
DEFAULT_ERROR_THRESHOLD)));
Map<String, Object> cacheMap = new HashMap<>();
cacheMap.put(CacheKeys.clusterConfig.name(), clusterConfig);
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
index ff47a71..5c7fb86 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration;
import java.util.Date;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
@@ -29,6 +30,7 @@ import
org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.rebalancer.TestAutoRebalance;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Message;
import org.apache.helix.participant.StateMachineEngine;
@@ -97,6 +99,11 @@ public class TestErrorReplicaPersist extends
ZkStandAloneCMTestBase {
@Test
public void testErrorReplicaPersist() throws InterruptedException {
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setErrorPartitionThresholdForLoadBalance(100000);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
for (int i = 0; i < (NODE_NR + 1) / 2; i++) {
_participants[i].syncStop();
Thread.sleep(2000);
diff --git
a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
index 9706d16..5d4cdaf 100644
---
a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
+++
b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
@@ -275,6 +275,121 @@
"localhost_12915": "OFFLINE"
}
}
+ },
+ {
+ "description": "Set error threshold, only downward allowed",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "-1",
+ "instanceThrottleLoad": "10",
+ "instanceThrottleRecovery": "-1",
+ "errorThreshold" : "2",
+ "partitionNames": [
+ "partition_0",
+ "partition_1",
+ "partition_2",
+ "partition_3"
+ ],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12916": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_2": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_3": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "STANDBY",
+ "localhost_12914": "ERROR",
+ "localhost_12915": "STANDBY",
+ "localhost_12916": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_2": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "ERROR",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_3": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "ERROR",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_2": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_3": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "pendingMessages": {
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "ERROR",
+ "localhost_12915": "STANDBY",
+ "localhost_12916": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_2": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "ERROR",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_3": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "ERROR",
+ "localhost_12915": "STANDBY"
+ }
+ }
}
]
}
\ No newline at end of file