This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 47ee384e1e6ae7d75e6a0736b30aa1b1b7b64d96 Author: Qi (Quincy) Qu <[email protected]> AuthorDate: Fri Jan 28 16:14:09 2022 -0800 Fix #1946 -- Refactor and move ClusterTopologyConfig Move ClusterTopologyConfig from nested to a standalone class in helix/model and to be used by virtual topology group logic. --- .../controller/rebalancer/topology/Topology.java | 120 ++++++--------------- .../apache/helix/model/ClusterTopologyConfig.java | 101 +++++++++++++++++ .../helix/model/TestClusterTopologyConfig.java | 84 +++++++++++++++ 3 files changed, 216 insertions(+), 89 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 3d2a878..f35b637 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.helix.HelixException; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.InstanceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,15 +54,7 @@ public class Topology { private final List<String> _allInstances; private final List<String> _liveInstances; private final Map<String, InstanceConfig> _instanceConfigMap; - private final ClusterConfig _clusterConfig; - private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_"; - - static class ClusterTopologyConfig { - String endNodeType; - String faultZoneType; - LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>(); - } - private ClusterTopologyConfig _clusterTopologyConfig; + private final ClusterTopologyConfig _clusterTopologyConfig; public Topology(final List<String> allNodes, final List<String> liveNodes, final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) { @@ -78,18 +71,16 @@ public class Topology { throw new HelixException(String.format("Config for instances %s is not found!", _allInstances.removeAll(_instanceConfigMap.keySet()))); } - _clusterConfig = clusterConfig; - _clusterTopologyConfig = getClusterTopologySetting(clusterConfig); - - _root = createClusterTree(); + _clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig); + _root = createClusterTree(clusterConfig); } public String getEndNodeType() { - return _clusterTopologyConfig.endNodeType; + return _clusterTopologyConfig.getEndNodeType(); } public String getFaultZoneType() { - return _clusterTopologyConfig.faultZoneType; + return _clusterTopologyConfig.getFaultZoneType(); } public Node getRootNode() { @@ -158,7 +149,7 @@ public class Topology { return newRoot; } - private Node createClusterTree() { + private Node createClusterTree(ClusterConfig clusterConfig) { // root Node root = new Node(); root.setName("root"); @@ -171,94 +162,46 @@ public class Topology { InstanceConfig insConfig = _instanceConfigMap.get(instanceName); try { LinkedHashMap<String, String> instanceTopologyMap = - computeInstanceTopologyMapHelper(_clusterConfig.isTopologyAwareEnabled(), instanceName, - insConfig, _clusterTopologyConfig.topologyKeyDefaultValue, - null /*faultZoneForEarlyQuit*/); + computeInstanceTopologyMapHelper(_clusterTopologyConfig, instanceName, insConfig, null); int weight = insConfig.getWeight(); if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { weight = DEFAULT_NODE_WEIGHT; } addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances); } catch (IllegalArgumentException e) { - if (isInstanceEnabled(_clusterConfig, instanceName, insConfig)) { + if (isInstanceEnabled(clusterConfig, instanceName, insConfig)) { throw e; } else { - logger - .warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!", - insConfig.getDomainAsString(), instanceName); + logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!", + insConfig.getDomainAsString(), instanceName); } } } return root; } - private boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName, + private static boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName, InstanceConfig instanceConfig) { return (instanceConfig.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null || !clusterConfig.getDisabledInstances().containsKey(instanceName))); } /** - * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for - * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset. - * - * @return an Instance of Topology.ClusterTopologyConfig. - */ - private static ClusterTopologyConfig getClusterTopologySetting(ClusterConfig clusterConfig) { - - ClusterTopologyConfig clusterTopologyConfig = new ClusterTopologyConfig(); - if (clusterConfig.isTopologyAwareEnabled()) { - String topologyDef = clusterConfig.getTopology(); - if (topologyDef != null) { - String[] topologyKeys = topologyDef.trim().split("/"); - int lastValidTypeIdx = 0; - for (int i = 0; i < topologyKeys.length; i++) { - if (topologyKeys[i].length() != 0) { - clusterTopologyConfig.topologyKeyDefaultValue - .put(topologyKeys[i], DEFAULT_DOMAIN_PREFIX + topologyKeys[i]); - lastValidTypeIdx = i; - } - } - if (clusterTopologyConfig.topologyKeyDefaultValue.size() == 0) { - throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef); - } - clusterTopologyConfig.endNodeType = topologyKeys[lastValidTypeIdx]; - String faultZoneType = clusterConfig.getFaultZoneType(); - if (faultZoneType == null) { - clusterTopologyConfig.faultZoneType = clusterTopologyConfig.endNodeType; - } else if (!clusterTopologyConfig.topologyKeyDefaultValue.containsKey(faultZoneType)) { - throw new HelixException(String - .format("Invalid fault zone type %s, not present in topology definition %s.", - faultZoneType, clusterConfig.getTopology())); - } else { - clusterTopologyConfig.faultZoneType = faultZoneType; - } - } else { - // Use default cluster topology definition, i,e. /root/zone/instance - clusterTopologyConfig.endNodeType = Types.INSTANCE.name(); - clusterTopologyConfig.faultZoneType = Types.ZONE.name(); - } - } else { - clusterTopologyConfig.endNodeType = Types.INSTANCE.name(); - clusterTopologyConfig.faultZoneType = Types.INSTANCE.name(); - } - return clusterTopologyConfig; - } - - /** - * @param clusterTopologyKeyDefaultValue a LinkedHashMap where keys are cluster topology path and - * values are their corresponding default value. The entries - * are ordered by ClusterConfig.topology setting. - * @param faultZoneForEarlyQuit this flag is set to true only if caller wants the path - * to faultZone instead the whole path for the instance. + * Construct the instance topology map for an instance. + * The mapping is the cluster topology path name to its corresponding value. + * @param clusterTopologyConfig + * @param instanceName + * @param instanceConfig + * @param faultZoneForEarlyQuit Nullable, if set to non-null value, the faultZone path will stop at the matched + * faultZone value instead of constructing the whole path for the instance. */ private static LinkedHashMap<String, String> computeInstanceTopologyMapHelper( - boolean isTopologyAwareEnabled, String instanceName, InstanceConfig instanceConfig, - LinkedHashMap<String, String> clusterTopologyKeyDefaultValue, String faultZoneForEarlyQuit) + ClusterTopologyConfig clusterTopologyConfig, String instanceName, InstanceConfig instanceConfig, + String faultZoneForEarlyQuit) throws IllegalArgumentException { LinkedHashMap<String, String> instanceTopologyMap = new LinkedHashMap<>(); - if (isTopologyAwareEnabled) { - if (clusterTopologyKeyDefaultValue.size() == 0) { + if (clusterTopologyConfig.isTopologyAwareEnabled()) { + if (clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty()) { // Return a ordered map using default cluster topology definition, i,e. /root/zone/instance String zone = instanceConfig.getZoneId(); if (zone == null) { @@ -283,11 +226,11 @@ public class Topology { instanceName)); } int numOfMatchedKeys = 0; - for (String key : clusterTopologyKeyDefaultValue.keySet()) { + for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) { // if a key does not exist in the instance domain config, using the default domain value. String value = domainAsMap.get(key); if (value == null || value.length() == 0) { - value = clusterTopologyKeyDefaultValue.get(key); + value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key); } else { numOfMatchedKeys++; } @@ -300,7 +243,7 @@ public class Topology { logger.warn( "Key-value pairs in InstanceConfig.Domain {} do not align with keys in ClusterConfig.Topology " + "{}, using default domain value instead", instanceConfig.getDomainAsString(), - clusterTopologyKeyDefaultValue.keySet()); + clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()); } } } else { @@ -327,11 +270,10 @@ public class Topology { public static LinkedHashMap<String, String> computeInstanceTopologyMap( ClusterConfig clusterConfig, String instanceName, InstanceConfig instanceConfig, boolean earlyQuitForFaultZone) { - ClusterTopologyConfig clusterTopologyConfig = getClusterTopologySetting(clusterConfig); + ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig); String faultZoneForEarlyQuit = - earlyQuitForFaultZone ? clusterTopologyConfig.faultZoneType : null; - return computeInstanceTopologyMapHelper(clusterConfig.isTopologyAwareEnabled(), instanceName, - instanceConfig, clusterTopologyConfig.topologyKeyDefaultValue, faultZoneForEarlyQuit); + earlyQuitForFaultZone ? clusterTopologyConfig.getFaultZoneType() : null; + return computeInstanceTopologyMapHelper(clusterTopologyConfig, instanceName, instanceConfig, faultZoneForEarlyQuit); } /** @@ -349,7 +291,7 @@ public class Topology { if (!current.hasChild(pathValue)) { buildNewNode(pathValue, path, current, instanceName, instanceWeight, liveInstances.contains(instanceName), pathNodes); - } else if (path.equals(_clusterTopologyConfig.endNodeType)) { + } else if (path.equals(_clusterTopologyConfig.getEndNodeType())) { throw new HelixException( "Failed to add topology node because duplicate leaf nodes are not allowed. Duplicate node name: " + pathValue); @@ -366,7 +308,7 @@ public class Topology { n.setType(type); n.setParent(parent); // if it is leaf node, create an InstanceNode instead - if (type.equals(_clusterTopologyConfig.endNodeType)) { + if (type.equals(_clusterTopologyConfig.getEndNodeType())) { n = new InstanceNode(n, instanceName); if (isLiveInstance) { // node is alive diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java new file mode 100644 index 0000000..f7ae740 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java @@ -0,0 +1,101 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.LinkedHashMap; +import org.apache.helix.HelixException; +import org.apache.helix.controller.rebalancer.topology.Topology; + + +public class ClusterTopologyConfig { + private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_"; + private static final String TOPOLOGY_SPLITTER = "/"; + + private final boolean _topologyAwareEnabled; + private final String _endNodeType; + private final String _faultZoneType; + private final LinkedHashMap<String, String> _topologyKeyDefaultValue; + + private ClusterTopologyConfig(boolean topologyAwareEnabled, String endNodeType, String faultZoneType, + LinkedHashMap<String, String> topologyKeyDefaultValue) { + _topologyAwareEnabled = topologyAwareEnabled; + _endNodeType = endNodeType; + _faultZoneType = faultZoneType; + _topologyKeyDefaultValue = topologyKeyDefaultValue; + } + + /** + * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for + * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset. + * + * @return an instance of {@link ClusterTopologyConfig} + */ + public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig clusterConfig) { + if (!clusterConfig.isTopologyAwareEnabled()) { + return new ClusterTopologyConfig( + false, + Topology.Types.INSTANCE.name(), + Topology.Types.INSTANCE.name(), + new LinkedHashMap<>()); + } + // Assign default cluster topology definition, i,e. /root/zone/instance + String endNodeType = Topology.Types.INSTANCE.name(); + String faultZoneType = Topology.Types.ZONE.name(); + LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>(); + + String topologyDef = clusterConfig.getTopology(); + if (topologyDef != null) { + for (String topologyKey : topologyDef.trim().split(TOPOLOGY_SPLITTER)) { + if (!topologyKey.isEmpty()) { + topologyKeyDefaultValue.put(topologyKey, DEFAULT_DOMAIN_PREFIX + topologyKey); + endNodeType = topologyKey; + } + } + if (topologyKeyDefaultValue.isEmpty()) { + throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef); + } + faultZoneType = clusterConfig.getFaultZoneType(); + if (faultZoneType == null) { + faultZoneType = endNodeType; + } else if (!topologyKeyDefaultValue.containsKey(faultZoneType)) { + throw new HelixException( + String.format("Invalid fault zone type %s, not present in topology definition %s.", + faultZoneType, clusterConfig.getTopology())); + } + } + return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue); + } + + public boolean isTopologyAwareEnabled() { + return _topologyAwareEnabled; + } + + public String getEndNodeType() { + return _endNodeType; + } + + public String getFaultZoneType() { + return _faultZoneType; + } + + public LinkedHashMap<String, String> getTopologyKeyDefaultValue() { + return _topologyKeyDefaultValue; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java new file mode 100644 index 0000000..8235312 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java @@ -0,0 +1,84 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Iterator; +import org.apache.helix.HelixException; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestClusterTopologyConfig { + + @Test + public void testClusterNonTopologyAware() { + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setTopologyAwareEnabled(false); + ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig); + Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), Topology.Types.INSTANCE.name()); + Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), Topology.Types.INSTANCE.name()); + Assert.assertTrue(clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty()); + } + + @Test + public void testClusterValidTopology() { + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setTopologyAwareEnabled(true); + testConfig.setTopology("/zone/instance"); + // no fault zone setup + ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig); + Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance"); + Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "instance"); + Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2); + // with fault zone + testConfig.setFaultZoneType("zone"); + testConfig.setTopology(" /zone/instance "); + clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig); + Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance"); + Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone"); + Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2); + String[] keys = new String[] {"zone", "instance"}; + Iterator<String> itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator(); + for (String k : keys) { + Assert.assertEquals(k, itr.next()); + } + + testConfig.setTopology("/rack/zone/instance"); + clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig); + Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance"); + Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone"); + Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 3); + keys = new String[] {"rack", "zone", "instance"}; + itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator(); + for (String k : keys) { + Assert.assertEquals(k, itr.next()); + } + } + + @Test(expectedExceptions = HelixException.class) + public void testClusterInvalidTopology() { + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setTopologyAwareEnabled(true); + testConfig.setTopology("/zone/instance"); + testConfig.setFaultZoneType("rack"); + ClusterTopologyConfig.createFromClusterConfig(testConfig); + } +}
