This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by
this push:
new 683afab80 Make logic to determine state of replicas on SWAP_IN
instance simpler and more predictable during an in-flight node swap. (#2706)
683afab80 is described below
commit 683afab8094304f08c0fbe6ad6e9d25f265e9684
Author: Zachary Pinto <[email protected]>
AuthorDate: Wed Dec 6 17:43:43 2023 -0800
Make logic to determine state of replicas on SWAP_IN instance simpler and
more predictable during an in-flight node swap. (#2706)
---
.../apache/helix/controller/rebalancer/AbstractRebalancer.java | 4 ++--
.../helix/controller/stages/BestPossibleStateCalcStage.java | 10 +++++++---
.../apache/helix/controller/stages/MessageSelectionStage.java | 4 ++--
.../src/main/java/org/apache/helix/examples/Quickstart.java | 2 +-
.../main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java | 5 +++--
.../src/main/java/org/apache/helix/model/LeaderStandbySMD.java | 5 +++--
.../src/main/java/org/apache/helix/model/MasterSlaveSMD.java | 5 +++--
.../src/main/java/org/apache/helix/model/OnlineOfflineSMD.java | 5 +++--
.../org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java | 3 ++-
.../main/java/org/apache/helix/model/StateModelDefinition.java | 10 ++++++----
.../main/java/org/apache/helix/model/StorageSchemataSMD.java | 5 +++--
.../apache/helix/model/util/StateModelDefinitionValidator.java | 3 ++-
.../src/main/java/org/apache/helix/util/RebalanceUtil.java | 5 +++--
helix-core/src/test/java/org/apache/helix/TestHelper.java | 2 +-
.../integration/TestPartitionLevelTransitionConstraint.java | 2 +-
.../apache/helix/integration/TestPreferenceListAsQueue.java | 3 ++-
.../helix/integration/messaging/TestMessageThrottle2.java | 2 +-
.../java/org/apache/helix/model/TestStateModelValidity.java | 2 +-
18 files changed, 46 insertions(+), 31 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 7a23b8f28..51158cb91 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -329,9 +329,9 @@ public abstract class AbstractRebalancer<T extends
BaseControllerDataProvider> i
int preferenceListSize) {
String num = stateModelDef.getNumInstancesPerState(state);
int stateCount = -1;
- if ("N".equals(num)) {
+ if
(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) {
stateCount = liveAndEnabledSize;
- } else if ("R".equals(num)) {
+ } else if
(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
stateCount = preferenceListSize;
} else {
try {
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8ec4b4475..05652e222 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -37,7 +37,6 @@ import org.apache.helix.controller.LogUtil;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.CustomRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
@@ -150,8 +149,13 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
commonInstances.forEach(swapOutInstance -> {
if
(stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {
- if
(AbstractRebalancer.getStateCount(stateModelDef.getTopState(), stateModelDef,
- stateMap.size() + 1, stateMap.size() + 1) >
stateMap.size()) {
+
+ String topStateCount =
+
stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
+ if (topStateCount.equals(
+
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+ || topStateCount.equals(
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
// If the swap-out instance's replica is a topState and
the StateModel allows for
// another replica with the topState to be added, set the
swap-in instance's replica
// to the topState.
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 09894263f..2751f1b26 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -262,9 +262,9 @@ public class MessageSelectionStage extends
AbstractBaseStage {
for (String state : statePriorityList) {
String numInstancesPerState =
stateModelDefinition.getNumInstancesPerState(state);
int max = -1;
- if ("N".equals(numInstancesPerState)) {
+ if
(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(numInstancesPerState))
{
max = cache.getLiveInstances().size();
- } else if ("R".equals(numInstancesPerState)) {
+ } else if
(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(numInstancesPerState))
{
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
//if (idealState != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
index 5d1df0a02..9cc14b603 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
@@ -124,7 +124,7 @@ public class Quickstart {
builder.upperBound(LEADER, 1);
// dynamic constraint, R means it should be derived based on the
replication
// factor.
- builder.dynamicUpperBound(STANDBY, "R");
+ builder.dynamicUpperBound(STANDBY,
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
StateModelDefinition statemodelDefinition = builder.build();
return statemodelDefinition;
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 7a0fe6377..8a8d13b7c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -2103,12 +2103,13 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("Invalid or unsupported state model
definition");
}
masterStateValue = state;
- } else if (count.equalsIgnoreCase("R")) {
+ } else if
(count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS))
{
if (slaveStateValue != null) {
throw new HelixException("Invalid or unsupported state model
definition");
}
slaveStateValue = state;
- } else if (count.equalsIgnoreCase("N")) {
+ } else if (count.equalsIgnoreCase(
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) {
if (!(masterStateValue == null && slaveStateValue == null)) {
throw new HelixException("Invalid or unsupported state model
definition");
}
diff --git
a/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java
b/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java
index e7c92a9ea..0d400817c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java
@@ -68,7 +68,8 @@ public final class LeaderStandbySMD extends
StateModelDefinition {
// bounds
builder.upperBound(States.LEADER.name(), 1);
- builder.dynamicUpperBound(States.STANDBY.name(), "R");
+ builder.dynamicUpperBound(States.STANDBY.name(),
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
return builder.build();
}
@@ -97,7 +98,7 @@ public final class LeaderStandbySMD extends
StateModelDefinition {
record.setMapField(key, metadata);
}
if (state.equals("STANDBY")) {
- metadata.put("count", "R");
+ metadata.put("count",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
}
if (state.equals("OFFLINE")) {
diff --git
a/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java
b/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java
index 02900a27a..09b06b27a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java
+++ b/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java
@@ -69,7 +69,8 @@ public final class MasterSlaveSMD extends
StateModelDefinition {
// bounds
builder.upperBound(States.MASTER.name(), 1);
- builder.dynamicUpperBound(States.SLAVE.name(), "R");
+ builder.dynamicUpperBound(States.SLAVE.name(),
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
return builder.build();
}
@@ -98,7 +99,7 @@ public final class MasterSlaveSMD extends
StateModelDefinition {
metadata.put("count", "1");
record.setMapField(key, metadata);
} else if (state.equals("SLAVE")) {
- metadata.put("count", "R");
+ metadata.put("count",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
metadata.put("count", "-1");
diff --git
a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java
b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java
index 3f3759d8d..fd97c7ba9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java
+++ b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java
@@ -63,7 +63,8 @@ public final class OnlineOfflineSMD extends
StateModelDefinition {
builder.addTransition(States.OFFLINE.name(),
HelixDefinedState.DROPPED.name());
// bounds
- builder.dynamicUpperBound(States.ONLINE.name(), "R");
+ builder.dynamicUpperBound(States.ONLINE.name(),
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
return builder.build();
}
@@ -87,7 +88,7 @@ public final class OnlineOfflineSMD extends
StateModelDefinition {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("ONLINE")) {
- metadata.put("count", "R");
+ metadata.put("count",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
}
if (state.equals("OFFLINE")) {
diff --git
a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java
b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java
index 90ccbde4a..58acf02a2 100644
---
a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java
+++
b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java
@@ -67,7 +67,8 @@ public final class OnlineOfflineWithBootstrapSMD extends
StateModelDefinition {
builder.addTransition(States.OFFLINE.name(),
HelixDefinedState.DROPPED.name());
// bounds
- builder.dynamicUpperBound(States.ONLINE.name(), "R");
+ builder.dynamicUpperBound(States.ONLINE.name(),
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
return new OnlineOfflineWithBootstrapSMD(builder.build().getRecord());
}
diff --git
a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index 9570dfeb3..fcf24fb30 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -49,6 +49,8 @@ public class StateModelDefinition extends HelixProperty {
}
public static final int TOP_STATE_PRIORITY = 1;
+ public static final String STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES = "N";
+ public static final String STATE_REPLICA_COUNT_ALL_REPLICAS = "R";
/**
* state model's initial state
@@ -200,7 +202,7 @@ public class StateModelDefinition extends HelixProperty {
/**
* Number of instances that can be in each state
* @param state the state name
- * @return maximum instance count per state, can be "N" or "R"
+ * @return maximum instance count per state, can be
STATE_REPLICA_COUNT_ALL_NODES or STATE_REPLICA_COUNT_ALL_REPLICAS
*/
public String getNumInstancesPerState(String state) {
return _statesCountMap.get(state);
@@ -449,11 +451,11 @@ public class StateModelDefinition extends HelixProperty {
if (candidateNodeNum <= 0) {
break;
}
- if ("N".equals(num)) {
+ if (STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) {
stateCountMap.put(state, candidateNodeNum);
replicas -= candidateNodeNum;
break;
- } else if ("R".equals(num)) {
+ } else if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
// wait until we get the counts for all other states
continue;
} else {
@@ -475,7 +477,7 @@ public class StateModelDefinition extends HelixProperty {
// get state count for R
for (String state : statesPriorityList) {
String num = getNumInstancesPerState(state);
- if ("R".equals(num)) {
+ if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) {
if (candidateNodeNum > 0 && replicas > 0) {
stateCountMap.put(state, replicas < candidateNodeNum ? replicas :
candidateNodeNum);
}
diff --git
a/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java
b/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java
index ea3fb4d9f..c19e3c44d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java
@@ -63,7 +63,8 @@ public final class StorageSchemataSMD extends
StateModelDefinition {
builder.addTransition(States.OFFLINE.name(),
HelixDefinedState.DROPPED.name());
// bounds
- builder.dynamicUpperBound(States.MASTER.name(), "N");
+ builder.dynamicUpperBound(States.MASTER.name(),
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES);
return builder.build();
}
@@ -88,7 +89,7 @@ public final class StorageSchemataSMD extends
StateModelDefinition {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("MASTER")) {
- metadata.put("count", "N");
+ metadata.put("count",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES);
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
metadata.put("count", "-1");
diff --git
a/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java
b/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java
index b208efa69..7eb2047cc 100644
---
a/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java
+++
b/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java
@@ -122,7 +122,8 @@ public class StateModelDefinitionValidator {
try {
Integer.parseInt(count);
} catch (NumberFormatException e) {
- if (!count.equals("N") && !count.equals("R")) {
+ if
(!count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+ &&
!count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
_logger.error("State " + state + " has invalid count " + count + ",
state model: "
+ _stateModelDef.getId());
return false;
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 868e0cf57..5c7effb6f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -125,12 +125,13 @@ public class RebalanceUtil {
throw new HelixException("Invalid or unsupported state model
definition");
}
masterStateValue = state;
- } else if (count.equalsIgnoreCase("R")) {
+ } else if
(count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS))
{
if (slaveStateValue != null) {
throw new HelixException("Invalid or unsupported state model
definition");
}
slaveStateValue = state;
- } else if (count.equalsIgnoreCase("N")) {
+ } else if (count.equalsIgnoreCase(
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) {
if (!(masterStateValue == null && slaveStateValue == null)) {
throw new HelixException("Invalid or unsupported state model
definition");
}
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java
b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 79f238da7..9dbba3476 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -693,7 +693,7 @@ public class TestHelper {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("ONLINE")) {
- metadata.put("count", "R");
+ metadata.put("count",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
} else if (state.equals("BOOTSTRAP")) {
metadata.put("count", "-1");
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index c4c37fe38..9805a8c08 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -230,7 +230,7 @@ public class TestPartitionLevelTransitionConstraint extends
ZkTestBase {
// static constraint
builder.upperBound("MASTER", 1);
// dynamic constraint, R means it should be derived based on the
replication factor.
- builder.dynamicUpperBound("SLAVE", "R");
+ builder.dynamicUpperBound("SLAVE",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
StateModelDefinition statemodelDefinition = builder.build();
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 2b32c219e..178b37a9c 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -266,7 +266,8 @@ public class TestPreferenceListAsQueue extends
ZkUnitTestBase {
.addState("ONLINE",
1).addState("OFFLINE").addState("DROPPED").addState("ERROR")
.initialState("OFFLINE").addTransition("ERROR", "OFFLINE", 1)
.addTransition("ONLINE", "OFFLINE", 2).addTransition("OFFLINE",
"DROPPED", 3)
- .addTransition("OFFLINE", "ONLINE", 4).dynamicUpperBound("ONLINE", "R")
+ .addTransition("OFFLINE", "ONLINE", 4)
+ .dynamicUpperBound("ONLINE",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)
.upperBound("OFFLINE", -1).upperBound("DROPPED",
-1).upperBound("ERROR", -1);
return builder.build();
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
index b37493101..b11e6350e 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
@@ -186,7 +186,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
record.setMapField(key, metadata);
break;
case "SLAVE":
- metadata.put("count", "R");
+ metadata.put("count",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
break;
case "OFFLINE":
diff --git
a/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java
b/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java
index f8955abbd..724c3315d 100644
---
a/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java
+++
b/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java
@@ -230,7 +230,7 @@ public class TestStateModelValidity {
.upperBound("MASTER", 1)
// R indicates an upper bound of number of replicas for each partition
- .dynamicUpperBound("SLAVE", "R")
+ .dynamicUpperBound("SLAVE",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)
// Add some high-priority transitions
.addTransition("SLAVE", "MASTER", 1).addTransition("OFFLINE", "SLAVE",
2)