Repository: helix Updated Branches: refs/heads/master 1c855ae85 -> 79ebc0469
Add integration test cases to test crush rebalance strategy for non-rackaware clusters. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/de1a27f6 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/de1a27f6 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/de1a27f6 Branch: refs/heads/master Commit: de1a27f6cf13c190275f2544c6c9d4afe78d5a82 Parents: 1c855ae Author: Lei Xia <[email protected]> Authored: Mon Sep 25 17:08:04 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Sep 25 17:08:04 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/helix/HelixAdmin.java | 4 +- .../org/apache/helix/model/InstanceConfig.java | 43 ++++ .../org/apache/helix/tools/ClusterSetup.java | 42 +--- .../TestCrushAutoRebalanceNonRack.java | 216 +++++++++++++++++++ 4 files changed, 263 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/HelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 75cbfcf..8f47ea5 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -445,14 +445,14 @@ public interface HelixAdmin { /** * @param clusterName - * @param instanceNames + * @param instanceName * @param tag */ void addInstanceTag(String clusterName, String instanceName, String tag); /** * @param clusterName - * @param instanceNames + * @param instanceName * @param tag */ void removeInstanceTag(String clusterName, String instanceName, String tag); http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index 6002591..1a80e70 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -452,4 +452,47 @@ public class InstanceConfig extends HelixProperty { // HELIX-65: remove check for hostname/port existence return true; } + + /** + * Create InstanceConfig with given instanceId, instanceId should be in format of host:port + * @param instanceId + * @return + */ + public static InstanceConfig toInstanceConfig(String instanceId) { + String host = null; + int port = -1; + // to maintain backward compatibility we parse string of format host:port + // and host_port, where host port must be of type string and int + char[] delims = new char[] { + ':', '_' + }; + for (char delim : delims) { + String regex = String.format("(.*)[%c]([\\d]+)", delim); + if (instanceId.matches(regex)) { + int lastIndexOf = instanceId.lastIndexOf(delim); + try { + port = Integer.parseInt(instanceId.substring(lastIndexOf + 1)); + host = instanceId.substring(0, lastIndexOf); + } catch (Exception e) { + _logger.warn("Unable to extract host and port from instanceId:" + instanceId); + } + break; + } + } + if (host != null && port > 0) { + instanceId = host + "_" + port; + } + InstanceConfig config = new InstanceConfig(instanceId); + if (host != null && port > 0) { + config.setHostName(host); + config.setPort(String.valueOf(port)); + + } + + config.setInstanceEnabled(true); + if (config.getHostName() == null) { + config.setHostName(instanceId); + } + return config; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index e79410d..03070b2 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -178,46 +178,8 @@ public class ClusterSetup { } } - private InstanceConfig toInstanceConfig(String instanceId) { - String host = null; - int port = -1; - // to maintain backward compatibility we parse string of format host:port - // and host_port, where host port must be of type string and int - char[] delims = new char[] { - ':', '_' - }; - for (char delim : delims) { - String regex = String.format("(.*)[%c]([\\d]+)", delim); - if (instanceId.matches(regex)) { - int lastIndexOf = instanceId.lastIndexOf(delim); - try { - port = Integer.parseInt(instanceId.substring(lastIndexOf + 1)); - host = instanceId.substring(0, lastIndexOf); - } catch (Exception e) { - _logger.warn("Unable to extract host and port from instanceId:" + instanceId); - } - break; - } - } - if (host != null && port > 0) { - instanceId = host + "_" + port; - } - InstanceConfig config = new InstanceConfig(instanceId); - if (host != null && port > 0) { - config.setHostName(host); - config.setPort(String.valueOf(port)); - - } - - config.setInstanceEnabled(true); - if (config.getHostName() == null) { - config.setHostName(instanceId); - } - return config; - } - public void addInstanceToCluster(String clusterName, String instanceId) { - InstanceConfig config = toInstanceConfig(instanceId); + InstanceConfig config = InstanceConfig.toInstanceConfig(instanceId); _admin.addInstance(clusterName, config); } @@ -238,7 +200,7 @@ public class ClusterSetup { new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); Builder keyBuilder = accessor.keyBuilder(); - InstanceConfig instanceConfig = toInstanceConfig(instanceId); + InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(instanceId); instanceId = instanceConfig.getInstanceName(); // ensure node is stopped http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java new file mode 100644 index 0000000..9672483 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java @@ -0,0 +1,216 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.helix.ConfigAccessor; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestCrushAutoRebalanceNonRack extends ZkIntegrationTestBase { + final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + protected ClusterSetup _setupTool = null; + List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); + Map<String, String> _nodeToTagMap = new HashMap<String, String>(); + List<String> _nodes = new ArrayList<String>(); + List<String> _allDBs = new ArrayList<String>(); + int _replica = 3; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + HelixConfigScope clusterScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(CLUSTER_NAME).build(); + + Map<String, String> configs = new HashMap<String, String>(); + configs.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/instance"); + configs.put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "instance"); + configAccessor.set(clusterScope, configs); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _nodes.add(storageNodeName); + String tag = "tag-" + i % 2; + _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToTagMap.put(storageNodeName, tag); + HelixConfigScope instanceScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT) + .forCluster(CLUSTER_NAME).forParticipant(storageNodeName).build(); + configAccessor + .set(instanceScope, InstanceConfig.InstanceConfigProperty.DOMAIN.name(), "instance=" + storageNodeName); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + } + + @DataProvider(name = "rebalanceStrategies") + public static String [][] rebalanceStrategies() { + return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}}; + } + + @Test(dataProvider = "rebalanceStrategies", enabled=true) + public void test(String rebalanceStrategyName, String rebalanceStrategyClass) + throws Exception { + System.out.println("Test " + rebalanceStrategyName); + List<String> testDBs = new ArrayList<String>(); + String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + int i = 0; + for (String stateModel : testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + testDBs.add(db); + _allDBs.add(db); + } + Thread.sleep(300); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + for (String db : testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev); + } + } + + @Test(dataProvider = "rebalanceStrategies", enabled=true) + public void testWithInstanceTag( + String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { + List<String> testDBs = new ArrayList<String>(); + Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); + int i = 0; + for (String tag : tags) { + String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, + BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "", + rebalanceStrategyClass); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + testDBs.add(db); + _allDBs.add(db); + } + Thread.sleep(300); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + for (String db : testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev); + } + } + + /** + * Validate each partition is different instances and with necessary tagged instances. + */ + private void validateIsolation(IdealState is, ExternalView ev) { + int replica = Integer.valueOf(is.getReplicas()); + String tag = is.getInstanceGroupTag(); + + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); + Set<String> instancesInEV = assignmentMap.keySet(); + Assert.assertEquals(instancesInEV.size(), replica); + for (String instance : instancesInEV) { + if (tag != null) { + InstanceConfig config = + _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + } + } + + @AfterClass + public void afterClass() throws Exception { + /** + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _setupTool.deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +}
