Repository: helix Updated Branches: refs/heads/master 7753b602c -> 94ac4253b
Add integration tests to test Helix's partition migration strategy during cluster expansion and idealstate rebalance strategy change. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/94ac4253 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/94ac4253 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/94ac4253 Branch: refs/heads/master Commit: 94ac4253bb6a35f8ad895200ba6b050a576d4198 Parents: 7753b60 Author: Lei Xia <[email protected]> Authored: Mon Apr 16 18:32:58 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Tue Jun 26 17:52:19 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/DummyProcessThread.java | 5 +- .../manager/MockParticipantManager.java | 20 +- .../PartitionMigration/TestExpandCluster.java | 124 ++++++++++ .../TestFullAutoMigration.java | 150 ++++++++++++ .../TestPartitionMigrationBase.java | 241 +++++++++++++++++++ .../rebalancer/TestMixedModeAutoRebalance.java | 13 - .../handling/TestResourceThreadpoolSize.java | 2 +- .../helix/mock/participant/DummyProcess.java | 14 +- 8 files changed, 540 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java index 995b123..84bcae4 100644 --- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java +++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java @@ -19,9 +19,10 @@ package org.apache.helix; * under the License. */ +import org.apache.helix.mock.participant.DummyProcess; import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory; import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory; -import org.apache.helix.mock.participant.DummyProcess.DummyStateModelFactory; +import org.apache.helix.mock.participant.DummyProcess.DummyMasterSlaveStateModelFactory; import org.apache.helix.participant.StateMachineEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,7 @@ public class DummyProcessThread implements Runnable { @Override public void run() { try { - DummyStateModelFactory stateModelFactory = new DummyStateModelFactory(0); + DummyMasterSlaveStateModelFactory stateModelFactory = new DummyMasterSlaveStateModelFactory(0); StateMachineEngine stateMach = _manager.getStateMachineEngine(); stateMach.registerStateModelFactory("MasterSlave", stateModelFactory); http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index cc168e9..55316ac 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -33,7 +33,6 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.participant.StateMachineEngine; -import org.apache.helix.participant.statemachine.StateModelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +43,23 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, protected CountDownLatch _stopCountDown = new CountDownLatch(1); protected CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1); - protected MockMSModelFactory _msModelFactory = new MockMSModelFactory(null); - protected DummyLeaderStandbyStateModelFactory _lsModelFactory = - new DummyLeaderStandbyStateModelFactory(10); - protected DummyOnlineOfflineStateModelFactory _ofModelFactory = - new DummyOnlineOfflineStateModelFactory(10); + protected int _transDelay = 10; + + protected MockMSModelFactory _msModelFactory; + protected DummyLeaderStandbyStateModelFactory _lsModelFactory; + protected DummyOnlineOfflineStateModelFactory _ofModelFactory; public MockParticipantManager(String zkAddr, String clusterName, String instanceName) { + this(zkAddr, clusterName, instanceName, 10); + } + + public MockParticipantManager(String zkAddr, String clusterName, String instanceName, + int transDelay) { super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr); + _transDelay = transDelay; + _msModelFactory = new MockMSModelFactory(null); + _lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay); + _ofModelFactory = new DummyOnlineOfflineStateModelFactory(_transDelay); } public void setTransition(MockTransition transition) { http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java new file mode 100644 index 0000000..799d750 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java @@ -0,0 +1,124 @@ +package org.apache.helix.integration.rebalancer.PartitionMigration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestExpandCluster extends TestPartitionMigrationBase { + + Map<String, IdealState> _resourceMap; + + + @BeforeClass + public void beforeClass() throws Exception { + super.beforeClass(); + _resourceMap = createTestDBs(1000000); + _migrationVerifier = new MigrationStateVerifier(_resourceMap, _manager); + } + + @Test + public void testClusterExpansion() throws Exception { + Assert.assertTrue(_clusterVerifier.verify()); + + _migrationVerifier.start(); + + // expand cluster by adding instance one by one + int numNodes = _participants.size(); + for (int i = numNodes; i < numNodes + NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + MockParticipantManager participant = createAndStartParticipant(storageNodeName); + _participants.add(participant); + Thread.sleep(50); + } + + Assert.assertTrue(_clusterVerifier.verify()); + Assert.assertFalse(_migrationVerifier.hasLessReplica()); + Assert.assertFalse(_migrationVerifier.hasMoreReplica()); + + _migrationVerifier.stop(); + } + + + @Test (dependsOnMethods = {"testClusterExpansion"}) + public void testClusterExpansionByEnableInstance() throws Exception { + Assert.assertTrue(_clusterVerifier.verify()); + + _migrationVerifier.reset(); + _migrationVerifier.start(); + + int numNodes = _participants.size(); + // add new instances with all disabled + for (int i = numNodes; i < numNodes + NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + InstanceConfig config = InstanceConfig.toInstanceConfig(storageNodeName); + config.setInstanceEnabled(false); + + _setupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config); + + // start dummy participants + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + participant.syncStart(); + _participants.add(participant); + } + + // enable new instance one by one + for (int i = numNodes; i < numNodes + NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, storageNodeName, true); + Thread.sleep(100); + } + + Assert.assertTrue(_clusterVerifier.verify()); + Assert.assertFalse(_migrationVerifier.hasLessReplica()); + Assert.assertFalse(_migrationVerifier.hasMoreReplica()); + + _migrationVerifier.stop(); + } + + @Test(dependsOnMethods = {"testClusterExpansion", "testClusterExpansionByEnableInstance"}) + public void testClusterShrink() throws Exception { + Assert.assertTrue(_clusterVerifier.verify()); + + _migrationVerifier.reset(); + _migrationVerifier.start(); + + // remove instance one by one + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + MockParticipantManager participant = _participants.get(i); + participant.syncStop(); + _setupTool.dropInstanceFromCluster(CLUSTER_NAME, storageNodeName); + } + + Assert.assertTrue(_clusterVerifier.verify()); + Assert.assertFalse(_migrationVerifier.hasLessReplica()); + Assert.assertFalse(_migrationVerifier.hasMoreReplica()); + + _migrationVerifier.stop(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java new file mode 100644 index 0000000..e93445d --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java @@ -0,0 +1,150 @@ +package org.apache.helix.integration.rebalancer.PartitionMigration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceConfig; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class TestFullAutoMigration extends TestPartitionMigrationBase { + ConfigAccessor _configAccessor; + + @BeforeClass + public void beforeClass() throws Exception { + super.beforeClass(); + _configAccessor = new ConfigAccessor(_gZkClient); + } + + @DataProvider(name = "stateModels") + public static Object [][] stateModels() { + return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true}, + {BuiltInStateModelDefinitions.OnlineOffline.name(), true}, + {BuiltInStateModelDefinitions.LeaderStandby.name(), true}, + {BuiltInStateModelDefinitions.MasterSlave.name(), false}, + {BuiltInStateModelDefinitions.OnlineOffline.name(), false}, + {BuiltInStateModelDefinitions.LeaderStandby.name(), false}, + }; + } + + @Test(dataProvider = "stateModels") + public void testMigrateToFullAutoWhileExpandCluster( + String stateModel, boolean delayEnabled) throws Exception { + String db = "Test-DB-" + stateModel; + if (delayEnabled) { + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, + _replica - 1, 200000, CrushRebalanceStrategy.class.getName()); + } else { + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, + _replica, 0, CrushRebalanceStrategy.class.getName()); + } + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists(); + List<String> userDefinedPartitions = new ArrayList<>(); + for (String partition : userDefinedPreferenceLists.keySet()) { + List<String> preferenceList = new ArrayList<>(); + for (int k = _replica; k > 0; k--) { + String instance = _participants.get(k).getInstanceName(); + preferenceList.add(instance); + } + userDefinedPreferenceLists.put(partition, preferenceList); + userDefinedPartitions.add(partition); + } + + ResourceConfig resourceConfig = + new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build(); + _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig); + + // add new instance to the cluster + int numNodes = _participants.size(); + for (int i = numNodes; i < numNodes + NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + MockParticipantManager participant = createAndStartParticipant(storageNodeName); + _participants.add(participant); + Thread.sleep(50); + } + + Assert.assertTrue(_clusterVerifier.verify()); + + _migrationVerifier = + new MigrationStateVerifier(Collections.singletonMap(db, idealState), _manager); + + _migrationVerifier.reset(); + _migrationVerifier.start(); + + while (userDefinedPartitions.size() > 0) { + removePartitionFromUserDefinedList(db, userDefinedPartitions); + Thread.sleep(50); + } + + + Assert.assertTrue(_clusterVerifier.verify()); + Assert.assertFalse(_migrationVerifier.hasLessReplica()); + Assert.assertFalse(_migrationVerifier.hasMoreReplica()); + + _migrationVerifier.stop(); + } + + private void removePartitionFromUserDefinedList(String db, List<String> userDefinedPartitions) { + ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, db); + Map<String, List<String>> lists = resourceConfig.getPreferenceLists(); + lists.remove(userDefinedPartitions.get(0)); + resourceConfig.setPreferenceLists(lists); + userDefinedPartitions.remove(0); + _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig); + } + + // create test DBs, wait it converged and return externalviews + protected Map<String, IdealState> createTestDBs(long delayTime) throws InterruptedException { + Map<String, IdealState> idealStateMap = new HashMap<>(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, _minActiveReplica, + delayTime); + _testDBs.add(db); + } + Thread.sleep(800); + Assert.assertTrue(_clusterVerifier.verify()); + for (String db : _testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + idealStateMap.put(db, is); + } + return idealStateMap; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java new file mode 100644 index 0000000..cbd1c24 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java @@ -0,0 +1,241 @@ +package org.apache.helix.integration.rebalancer.PartitionMigration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.api.listeners.ExternalViewChangeListener; +import org.apache.helix.api.listeners.IdealStateChangeListener; +import org.apache.helix.integration.DelayedTransitionBase; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + + +public class TestPartitionMigrationBase extends ZkIntegrationTestBase { + final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 50; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + protected ClusterSetup _setupTool = null; + List<MockParticipantManager> _participants = new ArrayList<>(); + int _replica = 3; + int _minActiveReplica = _replica - 1; + HelixClusterVerifier _clusterVerifier; + List<String> _testDBs = new ArrayList<>(); + + MigrationStateVerifier _migrationVerifier; + HelixManager _manager; + + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursively(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + MockParticipantManager participant = createAndStartParticipant(storageNodeName); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + + enablePersistIntermediateAssignment(_gZkClient, CLUSTER_NAME, true); + + _manager = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + } + + protected MockParticipantManager createAndStartParticipant(String instancename) { + _setupTool.addInstanceToCluster(CLUSTER_NAME, instancename); + + // start dummy participants + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instancename, 50); + participant.setTransition(new DelayedTransitionBase(50)); + participant.syncStart(); + return participant; + } + + protected String[] TestStateModels = { + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + protected Map<String, IdealState> createTestDBs(long delayTime) throws InterruptedException { + Map<String, IdealState> idealStateMap = new HashMap<>(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, _minActiveReplica, + delayTime); + _testDBs.add(db); + } + for (String db : _testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + idealStateMap.put(db, is); + } + return idealStateMap; + } + + class MigrationStateVerifier implements IdealStateChangeListener, ExternalViewChangeListener { + static final int EXTRA_REPLICA = 10; + + boolean _hasMoreReplica = false; + boolean _hasLessReplica = false; + HelixManager _manager; + boolean trackEnabled = false; + Map<String, IdealState> _resourceMap; + + + public MigrationStateVerifier(Map<String, IdealState> resourceMap, HelixManager manager) { + _resourceMap = resourceMap; + _manager = manager; + } + + // start tracking changes + public void start() throws Exception { + trackEnabled = true; + _manager.addIdealStateChangeListener(this); + _manager.addExternalViewChangeListener(this); + } + + // stop tracking changes + public void stop() { + trackEnabled = false; + PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder(); + _manager.removeListener(keyBuilder.idealStates(), this); + _manager.removeListener(keyBuilder.externalViews(), this); + } + + @Override + public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) + throws InterruptedException { + if (!trackEnabled) { + return; + } + for (IdealState is : idealStates) { + int replica = is.getReplicaCount(NUM_NODE); + for (String p : is.getPartitionSet()) { + Map<String, String> stateMap = is.getRecord().getMapField(p); + verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "IS"); + } + } + } + + @Override + public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) { + if (!trackEnabled) { + return; + } + for (ExternalView ev : externalViewList) { + IdealState is = _resourceMap.get(ev.getResourceName()); + if (is == null) { + continue; + } + int replica = is.getReplicaCount(NUM_NODE); + for (String p : is.getPartitionSet()) { + Map<String, String> stateMap = ev.getStateMap(p); + verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "EV"); + } + } + } + + private void verifyPartitionCount(String resource, String partition, + Map<String, String> stateMap, int replica, String warningPrefix) { + if (stateMap.size() < replica) { + System.out.println( + "resource " + resource + ", partition " + partition + " has " + stateMap.size() + + " replicas in " + warningPrefix); + _hasLessReplica = true; + } + + if (stateMap.size() > replica + EXTRA_REPLICA) { + System.out.println( + "resource " + resource + ", partition " + partition + " has " + stateMap.size() + + " replicas in " + warningPrefix); + _hasMoreReplica = true; + } + } + + public boolean hasMoreReplica() { + return _hasMoreReplica; + } + + public boolean hasLessReplica() { + return _hasLessReplica; + } + + public void reset() { + _hasMoreReplica = false; + _hasLessReplica = false; + } + } + + + @AfterClass + public void afterClass() throws Exception { + /** + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _manager.disconnect(); + _setupTool.deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java index dcbd7e1..77340af 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java @@ -21,40 +21,27 @@ package org.apache.helix.integration.rebalancer; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.integration.common.ZkIntegrationTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZKHelixManager; -import org.apache.helix.mock.participant.DummyProcess; import org.apache.helix.mock.participant.MockMSModelFactory; import org.apache.helix.mock.participant.MockMSStateModel; -import org.apache.helix.mock.participant.MockSchemataModelFactory; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; import org.apache.helix.model.ResourceConfig; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.participant.StateMachineEngine; -import org.apache.helix.participant.statemachine.StateModel; -import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.participant.statemachine.StateModelInfo; import org.apache.helix.participant.statemachine.Transition; -import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; import org.testng.Assert; http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java index 424c71f..3b8e8f0 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java @@ -226,7 +226,7 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase { } public static class TestMasterSlaveStateModelFactory - extends DummyProcess.DummyStateModelFactory { + extends DummyProcess.DummyMasterSlaveStateModelFactory { int _startThreadPoolSize; Map<String, ExecutorService> _threadPoolExecutorMap; http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java index b3eea72..9c87dda 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java @@ -54,7 +54,7 @@ public class DummyProcess { private final String _zkConnectString; private final String _clusterName; private final String _instanceName; - private DummyStateModelFactory stateModelFactory; + private DummyMasterSlaveStateModelFactory stateModelFactory; // private StateMachineEngine genericStateMachineHandler; private int _transDelayInMs = 0; @@ -91,7 +91,7 @@ public class DummyProcess { throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType); } - stateModelFactory = new DummyStateModelFactory(_transDelayInMs); + stateModelFactory = new DummyMasterSlaveStateModelFactory(_transDelayInMs); DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(_transDelayInMs); DummyOnlineOfflineStateModelFactory stateModelFactory2 = @@ -108,16 +108,16 @@ public class DummyProcess { return manager; } - public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel> { + public static class DummyMasterSlaveStateModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> { int _delay; - public DummyStateModelFactory(int delay) { + public DummyMasterSlaveStateModelFactory(int delay) { _delay = delay; } @Override - public DummyStateModel createNewStateModel(String resourceName, String stateUnitKey) { - DummyStateModel model = new DummyStateModel(); + public DummyMasterSlaveStateModel createNewStateModel(String resourceName, String stateUnitKey) { + DummyMasterSlaveStateModel model = new DummyMasterSlaveStateModel(); model.setDelay(_delay); return model; } @@ -155,7 +155,7 @@ public class DummyProcess { } } - public static class DummyStateModel extends StateModel { + public static class DummyMasterSlaveStateModel extends StateModel { int _transDelay = 0; public void setDelay(int delay) {
