This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch ApplicationClusterManager in repository https://gitbox.apache.org/repos/asf/helix.git
commit 70b05d57f80db3a7ac4115fff7ca83ae8c0f0aa1 Author: Zachary Pinto <[email protected]> AuthorDate: Wed Dec 6 17:43:43 2023 -0800 Make logic to determine state of replicas on SWAP_IN instance simpler and more predictable during an in-flight node swap. (#2706) --- .../apache/helix/controller/rebalancer/AbstractRebalancer.java | 4 ++-- .../helix/controller/stages/BestPossibleStateCalcStage.java | 10 +++++++--- .../apache/helix/controller/stages/MessageSelectionStage.java | 4 ++-- .../src/main/java/org/apache/helix/examples/Quickstart.java | 2 +- .../main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java | 5 +++-- .../src/main/java/org/apache/helix/model/LeaderStandbySMD.java | 5 +++-- .../src/main/java/org/apache/helix/model/MasterSlaveSMD.java | 5 +++-- .../src/main/java/org/apache/helix/model/OnlineOfflineSMD.java | 5 +++-- .../org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java | 3 ++- .../main/java/org/apache/helix/model/StateModelDefinition.java | 10 ++++++---- .../main/java/org/apache/helix/model/StorageSchemataSMD.java | 5 +++-- .../apache/helix/model/util/StateModelDefinitionValidator.java | 3 ++- .../src/main/java/org/apache/helix/util/RebalanceUtil.java | 5 +++-- helix-core/src/test/java/org/apache/helix/TestHelper.java | 2 +- .../integration/TestPartitionLevelTransitionConstraint.java | 2 +- .../apache/helix/integration/TestPreferenceListAsQueue.java | 3 ++- .../helix/integration/messaging/TestMessageThrottle2.java | 2 +- .../java/org/apache/helix/model/TestStateModelValidity.java | 2 +- 18 files changed, 46 insertions(+), 31 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java index 7a23b8f28..51158cb91 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java @@ -329,9 +329,9 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i int preferenceListSize) { String num = stateModelDef.getNumInstancesPerState(state); int stateCount = -1; - if ("N".equals(num)) { + if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) { stateCount = liveAndEnabledSize; - } else if ("R".equals(num)) { + } else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) { stateCount = preferenceListSize; } else { try { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 8ec4b4475..05652e222 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -37,7 +37,6 @@ import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; -import org.apache.helix.controller.rebalancer.AbstractRebalancer; import org.apache.helix.controller.rebalancer.CustomRebalancer; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.MaintenanceRebalancer; @@ -150,8 +149,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { commonInstances.forEach(swapOutInstance -> { if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) { - if (AbstractRebalancer.getStateCount(stateModelDef.getTopState(), stateModelDef, - stateMap.size() + 1, stateMap.size() + 1) > stateMap.size()) { + + String topStateCount = + stateModelDef.getNumInstancesPerState(stateModelDef.getTopState()); + if (topStateCount.equals( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES) + || topStateCount.equals( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { // If the swap-out instance's replica is a topState and the StateModel allows for // another replica with the topState to be added, set the swap-in instance's replica // to the topState. diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index 09894263f..2751f1b26 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -262,9 +262,9 @@ public class MessageSelectionStage extends AbstractBaseStage { for (String state : statePriorityList) { String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state); int max = -1; - if ("N".equals(numInstancesPerState)) { + if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(numInstancesPerState)) { max = cache.getLiveInstances().size(); - } else if ("R".equals(numInstancesPerState)) { + } else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(numInstancesPerState)) { // idealState is null when resource has been dropped, // R can't be evaluated and ignore state constraints //if (idealState != null) { diff --git a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java index 5d1df0a02..9cc14b603 100644 --- a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java +++ b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java @@ -124,7 +124,7 @@ public class Quickstart { builder.upperBound(LEADER, 1); // dynamic constraint, R means it should be derived based on the replication // factor. - builder.dynamicUpperBound(STANDBY, "R"); + builder.dynamicUpperBound(STANDBY, StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); StateModelDefinition statemodelDefinition = builder.build(); return statemodelDefinition; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 7a0fe6377..8a8d13b7c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -2103,12 +2103,13 @@ public class ZKHelixAdmin implements HelixAdmin { throw new HelixException("Invalid or unsupported state model definition"); } masterStateValue = state; - } else if (count.equalsIgnoreCase("R")) { + } else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { if (slaveStateValue != null) { throw new HelixException("Invalid or unsupported state model definition"); } slaveStateValue = state; - } else if (count.equalsIgnoreCase("N")) { + } else if (count.equalsIgnoreCase( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) { if (!(masterStateValue == null && slaveStateValue == null)) { throw new HelixException("Invalid or unsupported state model definition"); } diff --git a/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java b/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java index e7c92a9ea..0d400817c 100644 --- a/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java @@ -68,7 +68,8 @@ public final class LeaderStandbySMD extends StateModelDefinition { // bounds builder.upperBound(States.LEADER.name(), 1); - builder.dynamicUpperBound(States.STANDBY.name(), "R"); + builder.dynamicUpperBound(States.STANDBY.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return builder.build(); } @@ -97,7 +98,7 @@ public final class LeaderStandbySMD extends StateModelDefinition { record.setMapField(key, metadata); } if (state.equals("STANDBY")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } if (state.equals("OFFLINE")) { diff --git a/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java b/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java index 02900a27a..09b06b27a 100644 --- a/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java @@ -69,7 +69,8 @@ public final class MasterSlaveSMD extends StateModelDefinition { // bounds builder.upperBound(States.MASTER.name(), 1); - builder.dynamicUpperBound(States.SLAVE.name(), "R"); + builder.dynamicUpperBound(States.SLAVE.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return builder.build(); } @@ -98,7 +99,7 @@ public final class MasterSlaveSMD extends StateModelDefinition { metadata.put("count", "1"); record.setMapField(key, metadata); } else if (state.equals("SLAVE")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } else if (state.equals("OFFLINE")) { metadata.put("count", "-1"); diff --git a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java index 3f3759d8d..fd97c7ba9 100644 --- a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java @@ -63,7 +63,8 @@ public final class OnlineOfflineSMD extends StateModelDefinition { builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name()); // bounds - builder.dynamicUpperBound(States.ONLINE.name(), "R"); + builder.dynamicUpperBound(States.ONLINE.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return builder.build(); } @@ -87,7 +88,7 @@ public final class OnlineOfflineSMD extends StateModelDefinition { String key = state + ".meta"; Map<String, String> metadata = new HashMap<String, String>(); if (state.equals("ONLINE")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } if (state.equals("OFFLINE")) { diff --git a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java index 90ccbde4a..58acf02a2 100644 --- a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java @@ -67,7 +67,8 @@ public final class OnlineOfflineWithBootstrapSMD extends StateModelDefinition { builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name()); // bounds - builder.dynamicUpperBound(States.ONLINE.name(), "R"); + builder.dynamicUpperBound(States.ONLINE.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return new OnlineOfflineWithBootstrapSMD(builder.build().getRecord()); } diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index 9570dfeb3..fcf24fb30 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -49,6 +49,8 @@ public class StateModelDefinition extends HelixProperty { } public static final int TOP_STATE_PRIORITY = 1; + public static final String STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES = "N"; + public static final String STATE_REPLICA_COUNT_ALL_REPLICAS = "R"; /** * state model's initial state @@ -200,7 +202,7 @@ public class StateModelDefinition extends HelixProperty { /** * Number of instances that can be in each state * @param state the state name - * @return maximum instance count per state, can be "N" or "R" + * @return maximum instance count per state, can be STATE_REPLICA_COUNT_ALL_NODES or STATE_REPLICA_COUNT_ALL_REPLICAS */ public String getNumInstancesPerState(String state) { return _statesCountMap.get(state); @@ -449,11 +451,11 @@ public class StateModelDefinition extends HelixProperty { if (candidateNodeNum <= 0) { break; } - if ("N".equals(num)) { + if (STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) { stateCountMap.put(state, candidateNodeNum); replicas -= candidateNodeNum; break; - } else if ("R".equals(num)) { + } else if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) { // wait until we get the counts for all other states continue; } else { @@ -475,7 +477,7 @@ public class StateModelDefinition extends HelixProperty { // get state count for R for (String state : statesPriorityList) { String num = getNumInstancesPerState(state); - if ("R".equals(num)) { + if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) { if (candidateNodeNum > 0 && replicas > 0) { stateCountMap.put(state, replicas < candidateNodeNum ? replicas : candidateNodeNum); } diff --git a/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java b/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java index ea3fb4d9f..c19e3c44d 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java @@ -63,7 +63,8 @@ public final class StorageSchemataSMD extends StateModelDefinition { builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name()); // bounds - builder.dynamicUpperBound(States.MASTER.name(), "N"); + builder.dynamicUpperBound(States.MASTER.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES); return builder.build(); } @@ -88,7 +89,7 @@ public final class StorageSchemataSMD extends StateModelDefinition { String key = state + ".meta"; Map<String, String> metadata = new HashMap<String, String>(); if (state.equals("MASTER")) { - metadata.put("count", "N"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES); record.setMapField(key, metadata); } else if (state.equals("OFFLINE")) { metadata.put("count", "-1"); diff --git a/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java b/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java index b208efa69..7eb2047cc 100644 --- a/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java +++ b/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java @@ -122,7 +122,8 @@ public class StateModelDefinitionValidator { try { Integer.parseInt(count); } catch (NumberFormatException e) { - if (!count.equals("N") && !count.equals("R")) { + if (!count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES) + && !count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { _logger.error("State " + state + " has invalid count " + count + ", state model: " + _stateModelDef.getId()); return false; diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java index 868e0cf57..5c7effb6f 100644 --- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java @@ -125,12 +125,13 @@ public class RebalanceUtil { throw new HelixException("Invalid or unsupported state model definition"); } masterStateValue = state; - } else if (count.equalsIgnoreCase("R")) { + } else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { if (slaveStateValue != null) { throw new HelixException("Invalid or unsupported state model definition"); } slaveStateValue = state; - } else if (count.equalsIgnoreCase("N")) { + } else if (count.equalsIgnoreCase( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) { if (!(masterStateValue == null && slaveStateValue == null)) { throw new HelixException("Invalid or unsupported state model definition"); } diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java index 79f238da7..9dbba3476 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java @@ -693,7 +693,7 @@ public class TestHelper { String key = state + ".meta"; Map<String, String> metadata = new HashMap<String, String>(); if (state.equals("ONLINE")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } else if (state.equals("BOOTSTRAP")) { metadata.put("count", "-1"); diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java index c4c37fe38..9805a8c08 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java @@ -230,7 +230,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase { // static constraint builder.upperBound("MASTER", 1); // dynamic constraint, R means it should be derived based on the replication factor. - builder.dynamicUpperBound("SLAVE", "R"); + builder.dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); StateModelDefinition statemodelDefinition = builder.build(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java index 2b32c219e..178b37a9c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java @@ -266,7 +266,8 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase { .addState("ONLINE", 1).addState("OFFLINE").addState("DROPPED").addState("ERROR") .initialState("OFFLINE").addTransition("ERROR", "OFFLINE", 1) .addTransition("ONLINE", "OFFLINE", 2).addTransition("OFFLINE", "DROPPED", 3) - .addTransition("OFFLINE", "ONLINE", 4).dynamicUpperBound("ONLINE", "R") + .addTransition("OFFLINE", "ONLINE", 4) + .dynamicUpperBound("ONLINE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS) .upperBound("OFFLINE", -1).upperBound("DROPPED", -1).upperBound("ERROR", -1); return builder.build(); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java index b37493101..b11e6350e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java @@ -186,7 +186,7 @@ public class TestMessageThrottle2 extends ZkTestBase { record.setMapField(key, metadata); break; case "SLAVE": - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); break; case "OFFLINE": diff --git a/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java b/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java index f8955abbd..724c3315d 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java @@ -230,7 +230,7 @@ public class TestStateModelValidity { .upperBound("MASTER", 1) // R indicates an upper bound of number of replicas for each partition - .dynamicUpperBound("SLAVE", "R") + .dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS) // Add some high-priority transitions .addTransition("SLAVE", "MASTER", 1).addTransition("OFFLINE", "SLAVE", 2)
