This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit ad202c6e393403e079508e224e99b9ab8a9d226d Author: jiajunwang <[email protected]> AuthorDate: Tue Oct 1 13:49:33 2019 -0700 Adjust the topology processing logic for instance to ensure backward compatibility. --- .../rebalancer/waged/model/AssignableNode.java | 52 +++++++++------------- .../rebalancer/waged/model/TestAssignableNode.java | 15 ++++--- 2 files changed, 28 insertions(+), 39 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index 2a68e15..3bfd225 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -28,16 +28,14 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixException; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - /** * This class represents a possible allocation of the replication. * Note that any usage updates to the AssignableNode are not thread safe. @@ -138,8 +136,8 @@ public class AssignableNode implements Comparable<AssignableNode> { } Map<String, AssignableReplica> partitionMap = _currentAssignedReplicaMap.get(resourceName); - if (!partitionMap.containsKey(partitionName) - || !partitionMap.get(partitionName).equals(replica)) { + if (!partitionMap.containsKey(partitionName) || !partitionMap.get(partitionName) + .equals(replica)) { LOG.warn("Replica {} is not assigned to node {}. Ignore the release call.", replica.toString(), getInstanceName()); return; @@ -269,10 +267,9 @@ public class AssignableNode implements Comparable<AssignableNode> { * For example, when * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function * returns "2". - * If cannot find the fault zone id, this function leaves the fault zone id as the instance name. - * TODO merge this logic with Topology.java tree building logic. - * For now, the WAGED rebalancer has a more strict topology def requirement. - * Any missing field will cause an invalid topology config exception. + * If cannot find the fault zone type, this function leaves the fault zone id as the instance name. + * Note the WAGED rebalancer does not require full topology tree to be created. So this logic is + * simpler than the CRUSH based rebalancer. */ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { if (!clusterConfig.isTopologyAwareEnabled()) { @@ -290,36 +287,27 @@ public class AssignableNode implements Comparable<AssignableNode> { return zoneId == null ? instanceConfig.getInstanceName() : zoneId; } else { // Get the fault zone information from the complete topology definition. - String[] topologyDef = topologyStr.trim().split("/"); - if (topologyDef.length == 0 - || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { + String[] topologyKeys = topologyStr.trim().split("/"); + if (topologyKeys.length == 0 || Arrays.stream(topologyKeys) + .noneMatch(type -> type.equals(faultZoneType))) { throw new HelixException( "The configured topology definition is empty or does not contain the fault zone type."); } Map<String, String> domainAsMap = instanceConfig.getDomainAsMap(); - if (domainAsMap == null) { - throw new HelixException( - String.format("The domain configuration of node %s is not configured", _instanceName)); - } else { - StringBuilder faultZoneStringBuilder = new StringBuilder(); - for (String key : topologyDef) { - if (!key.isEmpty()) { - if (domainAsMap.containsKey(key)) { - faultZoneStringBuilder.append(domainAsMap.get(key)); - faultZoneStringBuilder.append('/'); - } else { - throw new HelixException(String.format( - "The domain configuration of node %s is not complete. Type %s is not found.", - _instanceName, key)); - } - if (key.equals(faultZoneType)) { - break; - } + StringBuilder faultZoneStringBuilder = new StringBuilder(); + for (String key : topologyKeys) { + if (!key.isEmpty()) { + // if a key does not exist in the instance domain config, apply the default domain value. + faultZoneStringBuilder.append(domainAsMap.getOrDefault(key, "Default_" + key)); + if (key.equals(faultZoneType)) { + break; + } else { + faultZoneStringBuilder.append('/'); } } - return faultZoneStringBuilder.toString(); } + return faultZoneStringBuilder.toString(); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java index b48587f..e8b010e 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java @@ -189,24 +189,25 @@ public class TestAssignableNode extends AbstractTestClusterModel { assignableNode.assign(duplicateReplica); } - @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The domain configuration of node testInstanceId is not complete. Type DOES_NOT_EXIST is not found.") + @Test public void testParseFaultZoneNotFound() throws IOException { ResourceControllerDataProvider testCache = setupClusterDataCache(); ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); - testClusterConfig.setFaultZoneType("DOES_NOT_EXIST"); + testClusterConfig.setFaultZoneType("zone"); testClusterConfig.setTopologyAwareEnabled(true); - testClusterConfig.setTopology("/DOES_NOT_EXIST/"); + testClusterConfig.setTopology("/zone/"); when(testCache.getClusterConfig()).thenReturn(testClusterConfig); InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId"); - testInstanceConfig.setDomain("zone=2, instance=testInstance"); + testInstanceConfig.setDomain("instance=testInstance"); Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); instanceConfigMap.put(_testInstanceId, testInstanceConfig); when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); - new AssignableNode(testCache.getClusterConfig(), + AssignableNode node = new AssignableNode(testCache.getClusterConfig(), testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); + Assert.assertEquals(node.getFaultZone(), "Default_zone"); } @Test @@ -228,7 +229,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); - Assert.assertEquals(assignableNode.getFaultZone(), "2/"); + Assert.assertEquals(assignableNode.getFaultZone(), "2"); testClusterConfig = new ClusterConfig("testClusterConfigId"); testClusterConfig.setFaultZoneType("instance"); @@ -245,7 +246,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { assignableNode = new AssignableNode(testCache.getClusterConfig(), testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); - Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/"); + Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance"); } @Test
