This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit b0f61507915f5052f53854ddbefcacac86f9c8d4 Author: Junkai Xue <[email protected]> AuthorDate: Fri Feb 1 13:23:35 2019 -0800 Add ERROR mapping for displaying in BestPossible and Intemediate assignment. With adding this ERROR mapping replica, it should not break any rules of assignment. It was purely serving for showing the ERROR partition targeted host. --- .../rebalancer/DelayedAutoRebalancer.java | 21 ++- .../java/org/apache/helix/common/ZkTestBase.java | 9 ++ .../helix/integration/TestErrorReplicaPersist.java | 151 +++++++++++++++++++++ .../TestDelayedAutoRebalancer.OnlineOffline.json | 2 +- 4 files changed, 178 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 7dd9128..62ad37f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -529,24 +529,37 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController // we should drop all partitions from previous assigned instances. if (!currentMapWithPreferenceList.values().contains(HelixDefinedState.ERROR.name()) && bestPossibleStateMap.size() > numReplicas && readyToDrop(currentStateMap, - bestPossibleStateMap, numReplicas, combinedPreferenceList)) { + bestPossibleStateMap, preferenceList, combinedPreferenceList)) { for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) { String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1); bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name()); } } + // Adding ERROR replica mapping to best possible + // ERROR assignment should be mutual excluded from DROPPED assignment because + // once there is an ERROR replica in the mapping, bestPossibleStateMap.size() > numReplicas prevents + // code entering the DROPPING stage. + for (String instance : combinedPreferenceList) { + if (currentStateMap.containsKey(instance) && currentStateMap.get(instance) + .equals(HelixDefinedState.ERROR.name())) { + bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.name()); + } + } + return bestPossibleStateMap; } private boolean readyToDrop(Map<String, String> currentStateMap, - Map<String, String> bestPossibleMap, int numReplicas, List<String> combinedPreferenceList) { + Map<String, String> bestPossibleMap, List<String> preferenceList, + List<String> combinedPreferenceList) { if (currentStateMap.size() != bestPossibleMap.size()) { return false; } + Set<String> tmpPreferenceSet = new HashSet<>(preferenceList); + tmpPreferenceSet.retainAll(combinedPreferenceList); - for (int i = 0; i < numReplicas; i++) { - String instance = combinedPreferenceList.get(i); + for (String instance : tmpPreferenceSet) { if (!currentStateMap.containsKey(instance) || !currentStateMap.get(instance) .equals(bestPossibleMap.get(instance))) { return false; diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 700d204..721cd33 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -244,6 +244,15 @@ public class ZkTestBase { configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); } + protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String clusterName, + boolean enabled, long delay) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setDelayRebalaceEnabled(enabled); + clusterConfig.setRebalanceDelayTime(delay); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + protected void enableP2PInCluster(String clusterName, ConfigAccessor configAccessor, boolean enable) { // enable p2p message in cluster. diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java new file mode 100644 index 0000000..126175a --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java @@ -0,0 +1,151 @@ +package org.apache.helix.integration; + +import java.util.Date; +import org.apache.helix.HelixRollbackException; +import org.apache.helix.NotificationContext; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.integration.common.ZkStandAloneCMTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.rebalancer.TestAutoRebalance; +import org.apache.helix.mock.participant.MockDelayMSStateModel; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.model.Message; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestErrorReplicaPersist extends ZkStandAloneCMTestBase { + @BeforeClass + public void beforeClass() throws Exception { + // Logger.getRootLogger().setLevel(Level.INFO); + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + int numNode = NODE_NR + 1; + _participants = new MockParticipantManager[numNode]; + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + createResourceWithDelayedRebalance(CLUSTER_NAME, TEST_DB, MasterSlaveSMD.name, _PARTITIONS, + _replica, _replica - 1, 1800000, CrushEdRebalanceStrategy.class.getName()); + for (int i = 0; i < numNode; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica); + + // start dummy participants + for (int i = 0; i < numNode; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + participant.syncStart(); + _participants[i] = participant; + } + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true, 1800000); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new TestAutoRebalance.ExternalViewBalancedVerifier(_gZkClient, CLUSTER_NAME, TEST_DB)); + + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + super.afterClass(); + } + + @Test + public void testErrorReplicaPersist() throws InterruptedException { + for (int i = 0; i < (NODE_NR + 1) / 2; i++) { + _participants[i].syncStop(); + Thread.sleep(2000); + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + StateMachineEngine stateMachineEngine = participant.getStateMachineEngine(); + stateMachineEngine + .registerStateModelFactory(MasterSlaveSMD.name, new MockFailedMSStateModelFactory()); + participant.syncStart(); + _participants[i] = participant; + } + HelixClusterVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build(); + Assert.assertTrue(((BestPossibleExternalViewVerifier) verifier).verifyByPolling()); + for (int i = 0; i < (NODE_NR + 1) / 2; i++) { + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, _participants[i].getInstanceName(), false); + } + + Assert.assertTrue(((BestPossibleExternalViewVerifier) verifier).verifyByPolling()); + } + + + class MockFailedMSStateModelFactory + extends StateModelFactory<MockFailedMSStateModel> { + + @Override + public MockFailedMSStateModel createNewStateModel(String resourceName, + String partitionKey) { + MockFailedMSStateModel model = new MockFailedMSStateModel(); + return model; + } + } + + @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" + }) + public static class MockFailedMSStateModel extends StateModel { + private static Logger LOG = LoggerFactory.getLogger(MockFailedMSStateModel.class); + + public MockFailedMSStateModel() { + } + + @Transition(to = "SLAVE", from = "OFFLINE") public void onBecomeSlaveFromOffline( + Message message, NotificationContext context) throws IllegalAccessException { + throw new IllegalAccessException("Failed!"); + } + + @Transition(to = "MASTER", from = "SLAVE") public void onBecomeMasterFromSlave(Message message, + NotificationContext context) throws InterruptedException, HelixRollbackException { + LOG.error("Become MASTER from SLAVE"); + } + + @Transition(to = "SLAVE", from = "MASTER") public void onBecomeSlaveFromMaster(Message message, + NotificationContext context) { + LOG.info("Become Slave from Master"); + } + + @Transition(to = "OFFLINE", from = "SLAVE") public void onBecomeOfflineFromSlave( + Message message, NotificationContext context) { + LOG.info("Become OFFLINE from SLAVE"); + } + + @Transition(to = "DROPPED", from = "OFFLINE") public void onBecomeDroppedFromOffline( + Message message, NotificationContext context) { + LOG.info("Become DROPPED FROM OFFLINE"); + } + } + +} diff --git a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json index 9ae6b24..55f5b3c 100644 --- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json +++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json @@ -53,7 +53,7 @@ "bestPossibleStates": { "localhost_2": "ONLINE", "localhost_3": "ONLINE", - "localhost_0": "DROPPED", + "localhost_0": "ERROR", "localhost_1": "ONLINE" } },
