Refactor and fix more integration tests.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/de38a7db Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/de38a7db Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/de38a7db Branch: refs/heads/master Commit: de38a7dbde05c22b12829389c674ff22bdffc289 Parents: 874f9e6 Author: Lei Xia <l...@linkedin.com> Authored: Thu Sep 28 10:25:26 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:08:54 2017 -0800 ---------------------------------------------------------------------- .../ZkHelixClusterVerifier.java | 37 +- .../TestCrushAutoRebalance.java | 316 +++++++++++++++++ .../TestCrushAutoRebalanceNonRack.java | 278 +++++++++++++++ ...rushAutoRebalanceTopoplogyAwareDisabled.java | 93 +++++ .../TestDelayedAutoRebalance.java | 342 +++++++++++++++++++ ...elayedAutoRebalanceWithDisabledInstance.java | 317 +++++++++++++++++ .../TestDelayedAutoRebalanceWithRackaware.java | 124 +++++++ .../rebalancer/TestCrushAutoRebalance.java | 316 ----------------- .../TestCrushAutoRebalanceNonRack.java | 278 --------------- ...rushAutoRebalanceTopoplogyAwareDisabled.java | 64 ---- .../rebalancer/TestDelayedAutoRebalance.java | 342 ------------------- ...elayedAutoRebalanceWithDisabledInstance.java | 317 ----------------- .../TestDelayedAutoRebalanceWithRackaware.java | 76 ----- 13 files changed, 1497 insertions(+), 1403 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java index ad5cda2..472157f 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java @@ -19,6 +19,9 @@ package org.apache.helix.tools.ClusterVerifiers; * under the License. */ +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.apache.helix.HelixDataAccessor; @@ -47,6 +50,13 @@ public abstract class ZkHelixClusterVerifier protected final PropertyKey.Builder _keyBuilder; private CountDownLatch _countdown; + private ExecutorService _verifyTaskThreadPool = + Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + return new Thread(r, "ZkHelixClusterVerifier-verify_thread"); + } + }); + protected static class ClusterVerifyTrigger { final PropertyKey _triggerKey; final boolean _triggerOnDataChange; @@ -190,7 +200,6 @@ public abstract class ZkHelixClusterVerifier try { success = verifyState(); if (!success) { - success = _countdown.await(timeout, TimeUnit.MILLISECONDS); if (!success) { // make a final try if timeout @@ -203,6 +212,7 @@ public abstract class ZkHelixClusterVerifier // clean up _zkClient.unsubscribeAll(); + _verifyTaskThreadPool.shutdownNow(); return success; } @@ -233,17 +243,28 @@ public abstract class ZkHelixClusterVerifier */ protected abstract boolean verifyState() throws Exception; + class VerifyStateCallbackTask implements Runnable { + @Override public void run() { + try { + boolean success = verifyState(); + if (success) { + _countdown.countDown(); + } + } catch (Exception ex) { + LOG.info("verifyState() throws exception: " + ex); + } + } + } + @Override public void handleDataChange(String dataPath, Object data) throws Exception { - boolean success = verifyState(); - if (success) { - _countdown.countDown(); - } + _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); } @Override public void handleDataDeleted(String dataPath) throws Exception { _zkClient.unsubscribeDataChanges(dataPath, this); + _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); } @Override @@ -252,11 +273,7 @@ public abstract class ZkHelixClusterVerifier String childPath = String.format("%s/%s", parentPath, child); _zkClient.subscribeDataChanges(childPath, this); } - - boolean success = verifyState(); - if (success) { - _countdown.countDown(); - } + _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); } public ZkClient getZkClient() { http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java new file mode 100644 index 0000000..967175f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java @@ -0,0 +1,316 @@ +package org.apache.helix.integration.rebalancer.CrushRebalancers; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class TestCrushAutoRebalance extends ZkIntegrationTestBase { + final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + protected ClusterSetup _setupTool = null; + List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); + Map<String, String> _nodeToZoneMap = new HashMap<String, String>(); + Map<String, String> _nodeToTagMap = new HashMap<String, String>(); + List<String> _nodes = new ArrayList<String>(); + Set<String> _allDBs = new HashSet<String>(); + int _replica = 3; + + String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + i % 3; + String tag = "tag-" + i % 2; + _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); + _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToZoneMap.put(storageNodeName, zone); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + } + + @DataProvider(name = "rebalanceStrategies") + public static String [][] rebalanceStrategies() { + return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}, + {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()} + }; + } + + @Test(dataProvider = "rebalanceStrategies", enabled=true) + public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass) + throws Exception { + System.out.println("testZoneIsolation " + rebalanceStrategyName); + + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, _replica); + } + } + + @Test(dataProvider = "rebalanceStrategies", enabled=true) + public void testZoneIsolationWithInstanceTag( + String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { + Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); + int i = 0; + for (String tag : tags) { + String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, + BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "", + rebalanceStrategyClass); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, _replica); + } + } + + @Test (dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag"}) + public void testLackEnoughLiveRacks() throws Exception { + System.out.println("TestLackEnoughInstances"); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + // shutdown participants within one zone + String zone = _nodeToZoneMap.values().iterator().next(); + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){ + p.syncStop(); + } + } + + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-CrushRebalanceStrategy-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, 2); + } + } + + @Test (dependsOnMethods = { "testLackEnoughLiveRacks"}) + public void testLackEnoughRacks() throws Exception { + System.out.println("TestLackEnoughInstances "); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + // shutdown participants within one zone + String zone = _nodeToZoneMap.values().iterator().next(); + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){ + p.syncStop(); + _setupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); + Thread.sleep(50); + _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); + } + } + + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-CrushRebalanceStrategy-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, 2); + } + } + + @AfterMethod + public void afterMethod() throws Exception { + for (String db : _allDBs) { + _setupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _allDBs.clear(); + // waiting for all DB be dropped. + Thread.sleep(100); + } + + /** + * Validate instances for each partition is on different zone and with necessary tagged instances. + */ + private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int expectedReplica) { + String tag = is.getInstanceGroupTag(); + for (String partition : is.getPartitionSet()) { + Set<String> assignedZones = new HashSet<String>(); + + Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); + Set<String> instancesInEV = assignmentMap.keySet(); + // TODO: preference List is not persisted in IS. + //Assert.assertEquals(instancesInEV, instancesInIs); + for (String instance : instancesInEV) { + assignedZones.add(_nodeToZoneMap.get(instance)); + if (tag != null) { + InstanceConfig config = + _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + Assert.assertEquals(assignedZones.size(), expectedReplica); + } + } + + @Test() + public void testAddZone() throws Exception { + //TODO + } + + @Test() + public void testAddNodes() throws Exception { + //TODO + } + + @Test() + public void testNodeFailure() throws Exception { + //TODO + } + + @AfterClass + public void afterClass() throws Exception { + /** + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _setupTool.deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java new file mode 100644 index 0000000..c11ee87 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java @@ -0,0 +1,278 @@ +package org.apache.helix.integration.rebalancer.CrushRebalancers; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.common.ZkStandAloneCMTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { + final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + protected ClusterSetup _setupTool = null; + List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); + Map<String, String> _nodeToTagMap = new HashMap<String, String>(); + List<String> _nodes = new ArrayList<String>(); + Set<String> _allDBs = new HashSet<String>(); + int _replica = 3; + + private static String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopology("/instance"); + clusterConfig.setFaultZoneType("instance"); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _nodes.add(storageNodeName); + String tag = "tag-" + i % 2; + _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToTagMap.put(storageNodeName, tag); + InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName); + instanceConfig.setDomain("instance=" + storageNodeName); + configAccessor.setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + //enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + } + + @DataProvider(name = "rebalanceStrategies") public static String[][] rebalanceStrategies() { + return new String[][] { { "CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName() } }; + } + + @Test(dataProvider = "rebalanceStrategies", enabled = true) + public void test(String rebalanceStrategyName, String rebalanceStrategyClass) + throws Exception { + System.out.println("Test " + rebalanceStrategyName); + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify(5000)); + + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev, _replica); + } + } + + @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = "test") + public void testWithInstanceTag(String rebalanceStrategyName, String rebalanceStrategyClass) + throws Exception { + Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); + int i = 3; + for (String tag : tags) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, + BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "", + rebalanceStrategyClass); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify(5000)); + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev, _replica); + } + } + + @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { "test", + "testWithInstanceTag"}) + public void testLackEnoughLiveInstances(String rebalanceStrategyName, + String rebalanceStrategyClass) throws Exception { + System.out.println("TestLackEnoughInstances " + rebalanceStrategyName); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + // shutdown participants, keep only two left + for (int i = 2; i < _participants.size(); i++) { + _participants.get(i).syncStop(); + } + + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify(5000)); + + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev, 2); + } + } + + @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { "test", + "testWithInstanceTag"}) + public void testLackEnoughInstances(String rebalanceStrategyName, + String rebalanceStrategyClass) throws Exception { + System.out.println("TestLackEnoughInstances " + rebalanceStrategyName); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + // shutdown participants, keep only two left + for (int i = 2; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + p.syncStop(); + _setupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); + Thread.sleep(50); + _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); + } + + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _allDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev, 2); + } + } + + /** + * Validate each partition is different instances and with necessary tagged instances. + */ + private void validateIsolation(IdealState is, ExternalView ev, int expectedReplica) { + String tag = is.getInstanceGroupTag(); + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); + Set<String> instancesInEV = assignmentMap.keySet(); + Assert.assertEquals(instancesInEV.size(), expectedReplica); + for (String instance : instancesInEV) { + if (tag != null) { + InstanceConfig config = + _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + } + } + + @AfterMethod public void afterMethod() throws Exception { + for (String db : _allDBs) { + _setupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _allDBs.clear(); + // waiting for all DB be dropped. + Thread.sleep(200); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java new file mode 100644 index 0000000..55245e7 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java @@ -0,0 +1,93 @@ +package org.apache.helix.integration.rebalancer.CrushRebalancers; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.tools.ClusterSetup; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestCrushAutoRebalanceTopoplogyAwareDisabled extends TestCrushAutoRebalanceNonRack { + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (ZkIntegrationTestBase._gZkClient.exists(namespace)) { + ZkIntegrationTestBase._gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(ZkIntegrationTestBase._gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (TestCrushAutoRebalanceNonRack.START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _nodes.add(storageNodeName); + String tag = "tag-" + i % 2; + _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToTagMap.put(storageNodeName, tag); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = + new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(ZkIntegrationTestBase._gZkClient, CLUSTER_NAME, true); + } + + @Test(dataProvider = "rebalanceStrategies") + public void test(String rebalanceStrategyName, + String rebalanceStrategyClass) throws Exception { + super.test(rebalanceStrategyName, rebalanceStrategyClass); + } + + @Test(dataProvider = "rebalanceStrategies", dependsOnMethods = "test") + public void testWithInstanceTag( + String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { + super.testWithInstanceTag(rebalanceStrategyName, rebalanceStrategyClass); + } + + @Test(dataProvider = "rebalanceStrategies", dependsOnMethods = { "test", "testWithInstanceTag" + }) + public void testLackEnoughLiveInstances(String rebalanceStrategyName, + String rebalanceStrategyClass) throws Exception { + super.testLackEnoughLiveInstances(rebalanceStrategyName, rebalanceStrategyClass); + } + + @Test(dataProvider = "rebalanceStrategies", dependsOnMethods = { "test", "testWithInstanceTag" + }) + public void testLackEnoughInstances(String rebalanceStrategyName, + String rebalanceStrategyClass) throws Exception { + super.testLackEnoughInstances(rebalanceStrategyName, rebalanceStrategyClass); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java new file mode 100644 index 0000000..817c207 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java @@ -0,0 +1,342 @@ +package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.omg.PortableServer.THREAD_POLICY_ID; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { + final int NUM_NODE = 5; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 5; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + protected ClusterSetup _setupTool = null; + List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); + int _replica = 3; + int _minActiveReplica = _replica - 1; + HelixClusterVerifier _clusterVerifier; + List<String> _testDBs = new ArrayList<String>(); + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + // start dummy participants + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + } + + protected String[] TestStateModels = { + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + /** + * The partition movement should be delayed (not happen immediately) after one single node goes offline. + * Delay is enabled by default, delay time is set in IdealState. + * @throws Exception + */ + @Test + public void testDelayedPartitionMovement() throws Exception { + Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + validateDelayedMovements(externalViewsBefore); + } + + @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) + public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + validateDelayedMovements(externalViewsBefore); + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + /** + * Test when two nodes go offline, the minimal active replica should be maintained. + * @throws Exception + */ + @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) + public void testMinimalActiveReplicaMaintain() throws Exception { + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + validateDelayedMovements(externalViewsBefore); + + // bring down another node, the minimal active replica for each partition should be maintained. + _participants.get(3).syncStop(); + Thread.sleep(500); + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + } + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + /** + * The partititon should be moved to other nodes after the delay time + */ + @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) + public void testPartitionMovementAfterDelayTime() throws Exception { + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + long delay = 4000; + Map<String, ExternalView> externalViewsBefore = createTestDBs(delay); + validateDelayedMovements(externalViewsBefore); + + Thread.sleep(delay + 200); + Assert.assertTrue(_clusterVerifier.verify()); + // after delay time, it should maintain required number of replicas. + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); + } + } + + @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) + public void testDisableDelayRebalanceInResource() throws Exception { + Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + validateDelayedMovements(externalViewsBefore); + + // disable delay rebalance for one db, partition should be moved immediately + String testDb = _testDBs.get(0); + IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState( + CLUSTER_NAME, testDb); + idealState.setDelayRebalanceEnabled(false); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState); + Thread.sleep(1000); + + + // once delay rebalance is disabled, it should maintain required number of replicas for that db. + // replica for other dbs should not be moved. + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + + if (db.equals(testDb)) { + validateMinActiveAndTopStateReplica(idealState, ev, _replica, NUM_NODE); + } else { + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), false); + } + } + } + + @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"}) + public void testDisableDelayRebalanceInCluster() throws Exception { + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + + Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + validateDelayedMovements(externalViewsBefore); + + // disable delay rebalance for the entire cluster. + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); + // TODO: remove this once controller is listening on cluster config change. + RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(), _testDBs.get(0)); + Thread.sleep(500); + Assert.assertTrue(_clusterVerifier.verify()); + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); + } + + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + } + + @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"}) + public void testDisableDelayRebalanceInInstance() throws Exception { + Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + validateDelayedMovements(externalViewsBefore); + + String disabledInstanceName = _participants.get(0).getInstanceName(); + enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, false); + Thread.sleep(1000); + + for (String db : _testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + Map<String, List<String>> preferenceLists = is.getPreferenceLists(); + for (List<String> instances : preferenceLists.values()) { + Assert.assertFalse(instances.contains(disabledInstanceName)); + } + } + enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, true); + } + + @AfterMethod + public void afterTest() throws InterruptedException { + // delete all DBs create in last test + for (String db : _testDBs) { + _setupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _testDBs.clear(); + Thread.sleep(50); + } + + @BeforeMethod + public void beforeTest() { + // restart any participant that has been disconnected from last test. + for (int i = 0; i < _participants.size(); i++) { + if (!_participants.get(i).isConnected()) { + _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _participants.get(i).getInstanceName())); + _participants.get(i).syncStart(); + } + } + } + + // create test DBs, wait it converged and return externalviews + protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { + Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, + _minActiveReplica, delayTime); + _testDBs.add(db); + } + Thread.sleep(800); + Assert.assertTrue(_clusterVerifier.verify()); + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, ExternalView evAfter, + String offlineInstance, boolean disabled) { + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentsBefore = evBefore.getRecord().getMapField(partition); + Map<String, String> assignmentsAfter = evAfter.getRecord().getMapField(partition); + + Set<String> instancesBefore = new HashSet<String>(assignmentsBefore.keySet()); + Set<String> instancesAfter = new HashSet<String>(assignmentsAfter.keySet()); + + if (disabled) { + // the offline node is disabled + Assert.assertEquals(instancesBefore, instancesAfter, String + .format("%s has been moved to new instances, before: %s, after: %s, disabled instance:", + partition, assignmentsBefore.toString(), assignmentsAfter.toString(), + offlineInstance)); + + if (instancesAfter.contains(offlineInstance)) { + Assert.assertEquals(assignmentsAfter.get(offlineInstance), "OFFLINE"); + } + } else { + // the offline node actually went offline. + instancesBefore.remove(offlineInstance); + Assert.assertEquals(instancesBefore, instancesAfter, String + .format("%s has been moved to new instances, before: %s, after: %s, offline instance:", + partition, assignmentsBefore.toString(), assignmentsAfter.toString(), + offlineInstance)); + } + } + } + + private void validateDelayedMovements(Map<String, ExternalView> externalViewsBefore) + throws InterruptedException { + // bring down one node, no partition should be moved. + _participants.get(0).syncStop(); + Thread.sleep(500); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), false); + } + } + + @AfterClass + public void afterClass() throws Exception { + /** + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _setupTool.deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java new file mode 100644 index 0000000..330f962 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java @@ -0,0 +1,317 @@ +package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAutoRebalance { + private ConfigAccessor _configAccessor; + + @BeforeClass + public void beforeClass() throws Exception { + super.beforeClass(); + _configAccessor = new ConfigAccessor(_gZkClient); + } + + + /** + * The partition movement should be delayed (not happen immediately) after one single node is disabled. + * Delay is enabled by default, delay time is set in IdealState. + * @throws Exception + */ + @Test + @Override + public void testDelayedPartitionMovement() throws Exception { + Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + + // Disable one node, no partition should be moved. + String instance = _participants.get(0).getInstanceName(); + enableInstance(instance, false); + + Thread.sleep(300); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, instance, true); + } + } + + @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) + @Override + public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + + // Disable one node, no partition should be moved. + String instance = _participants.get(0).getInstanceName(); + enableInstance(instance, false); + + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), true); + } + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + /** + * Test when two nodes were disabled, the minimal active replica should be maintained. + * @throws Exception + */ + @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) + @Override + public void testMinimalActiveReplicaMaintain() throws Exception { + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + + // disable one node, no partition should be moved. + enableInstance(_participants.get(0).getInstanceName(), false); + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), true); + } + + // disable another node, the minimal active replica for each partition should be maintained. + enableInstance(_participants.get(3).getInstanceName(), false); + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + } + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + /** + * Test when one node is disable while another node is offline, the minimal active replica should be maintained. + * @throws Exception + */ + @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) + public void testMinimalActiveReplicaMaintainWithOneOffline() throws Exception { + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + + // disable one node, no partition should be moved. + enableInstance(_participants.get(0).getInstanceName(), false); + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), true); + } + + // bring down another node, the minimal active replica for each partition should be maintained. + _participants.get(3).syncStop(); + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + } + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + /** + * The partititon should be moved to other nodes after the delay time + */ + @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) + @Override + public void testPartitionMovementAfterDelayTime() throws Exception { + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + long delay = 10000; + Map<String, ExternalView> externalViewsBefore = createTestDBs(delay); + + // disable one node, no partition should be moved. + enableInstance(_participants.get(0).getInstanceName(), false); + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), true); + } + + Thread.sleep(delay + 200); + // after delay time, it should maintain required number of replicas. + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); + } + } + + @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) + @Override + public void testDisableDelayRebalanceInResource() throws Exception { + Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + + // disable one node, no partition should be moved. + enableInstance(_participants.get(0).getInstanceName(), false); + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), true); + } + + // disable delay rebalance for one db, partition should be moved immediately + String testDb = _testDBs.get(0); + IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState( + CLUSTER_NAME, testDb); + idealState.setDelayRebalanceEnabled(false); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState); + Thread.sleep(2000); + Assert.assertTrue(_clusterVerifier.verify()); + + // once delay rebalance is disabled, it should maintain required number of replicas for that db. + // replica for other dbs should not be moved. + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + + if (db.equals(testDb)) { + validateMinActiveAndTopStateReplica(idealState, ev, _replica, NUM_NODE); + } else { + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), true); + } + } + } + + @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"}) + @Override + public void testDisableDelayRebalanceInCluster() throws Exception { + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + + Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + + // disable one node, no partition should be moved. + enableInstance(_participants.get(0).getInstanceName(), false); + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), true); + } + + // disable delay rebalance for the entire cluster. + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); + // TODO: remove this once controller is listening on cluster config change. + RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(), _testDBs.get(0)); + Thread.sleep(500); + Assert.assertTrue(_clusterVerifier.verify()); + for (String db : _testDBs) { + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState( + CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); + } + + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + } + + @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"}) + public void testDisableDelayRebalanceInInstance() throws Exception { + super.testDisableDelayRebalanceInInstance(); + } + + @BeforeMethod + public void beforeTest() { + // restart any participant that has been disconnected from last test. + for (int i = 0; i < _participants.size(); i++) { + if (!_participants.get(i).isConnected()) { + _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _participants.get(i).getInstanceName())); + _participants.get(i).syncStart(); + } + enableInstance(_participants.get(i).getInstanceName(), true); + } + } + + private void enableInstance(String instance, boolean enabled) { + // Disable one node, no partition should be moved. + long currentTime = System.currentTimeMillis(); + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instance, enabled); + InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled); + Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime); + Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 100); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java new file mode 100644 index 0000000..7d98f27 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java @@ -0,0 +1,124 @@ +package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.List; +import java.util.Map; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebalance { + final int NUM_NODE = 9; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + i % 3; + _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); + + // start dummy participants + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + participant.syncStart(); + _participants.add(participant); + } + + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + } + + @Override + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { + return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, + minActiveReplica, delay, CrushRebalanceStrategy.class.getName()); + } + + @Test + public void testDelayedPartitionMovement() throws Exception { + super.testDelayedPartitionMovement(); + } + + @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) + public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { + super.testDelayedPartitionMovementWithClusterConfigedDelay(); + } + + /** + * Test when two nodes go offline, the minimal active replica should be maintained. + * @throws Exception + */ + @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) + public void testMinimalActiveReplicaMaintain() throws Exception { + super.testMinimalActiveReplicaMaintain(); + } + + /** + * The partititon should be moved to other nodes after the delay time + */ + @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) + public void testPartitionMovementAfterDelayTime() throws Exception { + super.testPartitionMovementAfterDelayTime(); + } + + @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) + public void testDisableDelayRebalanceInResource() throws Exception { + super.testDisableDelayRebalanceInResource(); + } + + @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"}) + public void testDisableDelayRebalanceInCluster() throws Exception { + super.testDisableDelayRebalanceInCluster(); + } + + @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"}) + public void testDisableDelayRebalanceInInstance() throws Exception { + super.testDisableDelayRebalanceInInstance(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java deleted file mode 100644 index 6551d1e..0000000 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java +++ /dev/null @@ -1,316 +0,0 @@ -package org.apache.helix.integration.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy; -import org.apache.helix.integration.common.ZkIntegrationTestBase; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.BuiltInStateModelDefinitions; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; -import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -public class TestCrushAutoRebalance extends ZkIntegrationTestBase { - final int NUM_NODE = 6; - protected static final int START_PORT = 12918; - protected static final int _PARTITIONS = 20; - - protected final String CLASS_NAME = getShortClassName(); - protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; - protected ClusterControllerManager _controller; - - protected ClusterSetup _setupTool = null; - List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); - Map<String, String> _nodeToZoneMap = new HashMap<String, String>(); - Map<String, String> _nodeToTagMap = new HashMap<String, String>(); - List<String> _nodes = new ArrayList<String>(); - Set<String> _allDBs = new HashSet<String>(); - int _replica = 3; - - String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), - BuiltInStateModelDefinitions.MasterSlave.name(), - BuiltInStateModelDefinitions.LeaderStandby.name() - }; - - @BeforeClass - public void beforeClass() throws Exception { - System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursive(namespace); - } - _setupTool = new ClusterSetup(_gZkClient); - _setupTool.addCluster(CLUSTER_NAME, true); - - for (int i = 0; i < NUM_NODE; i++) { - String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); - _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); - String zone = "zone-" + i % 3; - String tag = "tag-" + i % 2; - _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); - _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); - _nodeToZoneMap.put(storageNodeName, zone); - _nodeToTagMap.put(storageNodeName, tag); - _nodes.add(storageNodeName); - } - - // start dummy participants - for (String node : _nodes) { - MockParticipantManager participant = - new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); - participant.syncStart(); - _participants.add(participant); - } - - // start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); - _controller.syncStart(); - - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); - } - - @DataProvider(name = "rebalanceStrategies") - public static String [][] rebalanceStrategies() { - return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}, - {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()} - }; - } - - @Test(dataProvider = "rebalanceStrategies", enabled=true) - public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass) - throws Exception { - System.out.println("testZoneIsolation " + rebalanceStrategyName); - - int i = 0; - for (String stateModel : _testModels) { - String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, - RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateZoneAndTagIsolation(is, ev, _replica); - } - } - - @Test(dataProvider = "rebalanceStrategies", enabled=true) - public void testZoneIsolationWithInstanceTag( - String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { - Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); - int i = 0; - for (String tag : tags) { - String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, - BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "", - rebalanceStrategyClass); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - is.setInstanceGroupTag(tag); - _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateZoneAndTagIsolation(is, ev, _replica); - } - } - - @Test (dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag"}) - public void testLackEnoughLiveRacks() throws Exception { - System.out.println("TestLackEnoughInstances"); - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - - // shutdown participants within one zone - String zone = _nodeToZoneMap.values().iterator().next(); - for (int i = 0; i < _participants.size(); i++) { - MockParticipantManager p = _participants.get(i); - if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){ - p.syncStop(); - } - } - - int i = 0; - for (String stateModel : _testModels) { - String db = "Test-DB-CrushRebalanceStrategy-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, - RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateZoneAndTagIsolation(is, ev, 2); - } - } - - @Test (dependsOnMethods = { "testLackEnoughLiveRacks"}) - public void testLackEnoughRacks() throws Exception { - System.out.println("TestLackEnoughInstances "); - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - - // shutdown participants within one zone - String zone = _nodeToZoneMap.values().iterator().next(); - for (int i = 0; i < _participants.size(); i++) { - MockParticipantManager p = _participants.get(i); - if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){ - p.syncStop(); - _setupTool.getClusterManagementTool() - .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); - Thread.sleep(50); - _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); - } - } - - int i = 0; - for (String stateModel : _testModels) { - String db = "Test-DB-CrushRebalanceStrategy-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, - RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateZoneAndTagIsolation(is, ev, 2); - } - } - - @AfterMethod - public void afterMethod() throws Exception { - for (String db : _allDBs) { - _setupTool.dropResourceFromCluster(CLUSTER_NAME, db); - } - _allDBs.clear(); - // waiting for all DB be dropped. - Thread.sleep(100); - } - - /** - * Validate instances for each partition is on different zone and with necessary tagged instances. - */ - private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int expectedReplica) { - String tag = is.getInstanceGroupTag(); - for (String partition : is.getPartitionSet()) { - Set<String> assignedZones = new HashSet<String>(); - - Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); - Set<String> instancesInEV = assignmentMap.keySet(); - // TODO: preference List is not persisted in IS. - //Assert.assertEquals(instancesInEV, instancesInIs); - for (String instance : instancesInEV) { - assignedZones.add(_nodeToZoneMap.get(instance)); - if (tag != null) { - InstanceConfig config = - _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); - Assert.assertTrue(config.containsTag(tag)); - } - } - Assert.assertEquals(assignedZones.size(), expectedReplica); - } - } - - @Test() - public void testAddZone() throws Exception { - //TODO - } - - @Test() - public void testAddNodes() throws Exception { - //TODO - } - - @Test() - public void testNodeFailure() throws Exception { - //TODO - } - - @AfterClass - public void afterClass() throws Exception { - /** - * shutdown order: 1) disconnect the controller 2) disconnect participants - */ - _controller.syncStop(); - for (MockParticipantManager participant : _participants) { - participant.syncStop(); - } - _setupTool.deleteCluster(CLUSTER_NAME); - System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - } -}