This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit e893a38651785d4202cd647f8e1c67c0fc29b597
Author: Jiajun Wang <[email protected]>
AuthorDate: Tue Sep 17 13:41:56 2019 -0700

    Adjust the expected replica count according to fault zone count. (#476)
    
    The rebalancer should determine the expected replica count according to the 
fault zone instead of the node count only.
---
 .../rebalancer/waged/model/AssignableNode.java     | 56 ++++++++++------------
 .../waged/model/ClusterModelProvider.java          | 28 ++++++-----
 .../waged/model/ClusterModelTestHelper.java        |  3 +-
 .../rebalancer/waged/model/TestAssignableNode.java | 24 ++++------
 .../rebalancer/waged/model/TestClusterModel.java   |  3 +-
 .../waged/model/TestClusterModelProvider.java      | 33 ++++++++-----
 6 files changed, 76 insertions(+), 71 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 6966353..a3460fb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -64,12 +64,10 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * @param clusterConfig
    * @param instanceConfig
    * @param instanceName
-   * @param existingAssignment A collection of replicas that have been 
pre-allocated to the node.
    */
-  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, 
String instanceName,
-      Collection<AssignableReplica> existingAssignment) {
+  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, 
String instanceName) {
     _instanceName = instanceName;
-    refresh(clusterConfig, instanceConfig, existingAssignment);
+    refresh(clusterConfig, instanceConfig);
   }
 
   private void reset() {
@@ -88,10 +86,8 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * subject to change. If the assumption is no longer true, this function 
should become private.
    * @param clusterConfig - the Cluster Config of the cluster where the node 
is located
    * @param instanceConfig - the Instance Config of the node
-   * @param existingAssignment - all the existing replicas that are current 
assigned to the node
    */
-  private void refresh(ClusterConfig clusterConfig, InstanceConfig 
instanceConfig,
-      Collection<AssignableReplica> existingAssignment) {
+  private void refresh(ClusterConfig clusterConfig, InstanceConfig 
instanceConfig) {
     reset();
 
     Map<String, Integer> instanceCapacity = 
fetchInstanceCapacity(clusterConfig, instanceConfig);
@@ -101,8 +97,29 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
     _maxCapacity = instanceCapacity;
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+  }
+
+  /**
+   * This function should only be used to assign a set of new partitions that 
are not allocated on
+   * this node.
+   * Using this function avoids the overhead of updating capacity repeatedly.
+   */
+  void assignNewBatch(Collection<AssignableReplica> replicas) {
+    Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+    for (AssignableReplica replica : replicas) {
+      addToAssignmentRecord(replica);
+      // increment the capacity requirement according to partition's capacity 
configuration.
+      for (Map.Entry<String, Integer> capacity : 
replica.getCapacity().entrySet()) {
+        totalPartitionCapacity.compute(capacity.getKey(),
+            (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                : totalValue + capacity.getValue());
+      }
+    }
 
-    assignNewBatch(existingAssignment);
+    // Update the global state after all single replications' calculation is 
done.
+    for (String key : totalPartitionCapacity.keySet()) {
+      updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
+    }
   }
 
   /**
@@ -315,29 +332,6 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   }
 
   /**
-   * This function should only be used to assign a set of new partitions that 
are not allocated on
-   * this node.
-   * Using this function avoids the overhead of updating capacity repeatedly.
-   */
-  private void assignNewBatch(Collection<AssignableReplica> replicas) {
-    Map<String, Integer> totalPartitionCapacity = new HashMap<>();
-    for (AssignableReplica replica : replicas) {
-      addToAssignmentRecord(replica);
-      // increment the capacity requirement according to partition's capacity 
configuration.
-      for (Map.Entry<String, Integer> capacity : 
replica.getCapacity().entrySet()) {
-        totalPartitionCapacity.compute(capacity.getKey(),
-            (key, totalValue) -> (totalValue == null) ? capacity.getValue()
-                : totalValue + capacity.getValue());
-      }
-    }
-
-    // Update the global state after all single replications' calculation is 
done.
-    for (String key : totalPartitionCapacity.keySet()) {
-      updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
-    }
-  }
-
-  /**
    * @throws HelixException if the replica has already been assigned to the 
node.
    */
   private void addToAssignmentRecord(AssignableReplica replica) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 3570164..20024c7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -62,10 +62,15 @@ public class ClusterModelProvider {
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
       Map<String, ResourceAssignment> baselineAssignment,
       Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // Construct all the assignable nodes and initialize with the allocated 
replicas.
+    Set<AssignableNode> assignableNodes =
+        parseAllNodes(dataProvider.getClusterConfig(), 
dataProvider.getInstanceConfigMap(),
+            activeInstances);
+
     // Generate replica objects for all the resource partitions.
     // <resource, replica set>
     Map<String, Set<AssignableReplica>> replicaMap =
-        parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
+        parseAllReplicas(dataProvider, resourceMap, assignableNodes);
 
     // Check if the replicas need to be reassigned.
     Map<String, Set<AssignableReplica>> allocatedReplicas =
@@ -74,10 +79,9 @@ public class ClusterModelProvider {
         findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
             bestPossibleAssignment, allocatedReplicas);
 
-    // Construct all the assignable nodes and initialize with the allocated 
replicas.
-    Set<AssignableNode> assignableNodes =
-        parseAllNodes(dataProvider.getClusterConfig(), 
dataProvider.getInstanceConfigMap(),
-            activeInstances, allocatedReplicas);
+    // Update the allocated replicas to the assignable nodes.
+    assignableNodes.stream().forEach(node -> node.assignNewBatch(
+        allocatedReplicas.getOrDefault(node.getInstanceName(), 
Collections.emptySet())));
 
     // Construct and initialize cluster context.
     ClusterContext context = new ClusterContext(
@@ -171,15 +175,13 @@ public class ClusterModelProvider {
    * @param clusterConfig     The cluster configuration.
    * @param instanceConfigMap A map of all the instance configuration.
    * @param activeInstances   All the instances that are online and enabled.
-   * @param allocatedReplicas A map of all the assigned replicas, which will 
not be reassigned during the rebalance.
    * @return A map of assignable node set, <InstanceName, node set>.
    */
   private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
-      Map<String, InstanceConfig> instanceConfigMap, Set<String> 
activeInstances,
-      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+      Map<String, InstanceConfig> instanceConfigMap, Set<String> 
activeInstances) {
     return activeInstances.stream().map(
         instanceName -> new AssignableNode(clusterConfig, 
instanceConfigMap.get(instanceName),
-            instanceName, allocatedReplicas.getOrDefault(instanceName, 
Collections.emptySet())))
+            instanceName))
         .collect(Collectors.toSet());
   }
 
@@ -188,11 +190,12 @@ public class ClusterModelProvider {
    *
    * @param dataProvider The cluster status cache that contains the current 
cluster status.
    * @param resourceMap  All the valid resources that are managed by the 
rebalancer.
+   * @param assignableNodes All the active assignable nodes.
    * @return A map of assignable replica set, <ResourceName, replica set>.
    */
   private static Map<String, Set<AssignableReplica>> parseAllReplicas(
       ResourceControllerDataProvider dataProvider, Map<String, Resource> 
resourceMap,
-      int instanceCount) {
+      Set<AssignableNode> assignableNodes) {
     Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
     ClusterConfig clusterConfig = dataProvider.getClusterConfig();
 
@@ -211,8 +214,11 @@ public class ClusterModelProvider {
                 is.getStateModelDefRef(), resourceName));
       }
 
+      int activeFaultZoneCount =
+          assignableNodes.stream().map(node -> 
node.getFaultZone()).collect(Collectors.toSet())
+              .size();
       Map<String, Integer> stateCountMap =
-          def.getStateCountMap(instanceCount, 
is.getReplicaCount(instanceCount));
+          def.getStateCountMap(activeFaultZoneCount, 
is.getReplicaCount(assignableNodes.size()));
 
       for (String partition : is.getPartitionSet()) {
         for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 76f1141..08143c6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -43,8 +43,7 @@ public class ClusterModelTestHelper extends 
AbstractTestClusterModel {
     Set<AssignableNode> nodeSet = new HashSet<>();
     testCache.getInstanceConfigMap().values().stream()
             .forEach(config -> nodeSet.add(new 
AssignableNode(testCache.getClusterConfig(),
-                    testCache.getInstanceConfigMap().get(_testInstanceId), 
config.getInstanceName(),
-                    Collections.emptyList())));
+                    testCache.getInstanceConfigMap().get(_testInstanceId), 
config.getInstanceName())));
     return nodeSet;
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 92a6998..6975901 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -64,7 +64,8 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     expectedCapacityMap.put("item3", 30);
 
     AssignableNode assignableNode = new 
AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId, assignmentSet);
+        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId);
+    assignableNode.assignNewBatch(assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), 
expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 
20.0, 0.005);
@@ -167,8 +168,7 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
 
     AssignableNode assignableNode = new 
AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId);
     AssignableReplica removingReplica = new 
AssignableReplica(testCache.getClusterConfig(),
         testCache.getResourceConfig(_resourceNames.get(1)), 
_partitionNames.get(2) + "non-exist",
         "MASTER", 1);
@@ -183,7 +183,8 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
 
     AssignableNode assignableNode = new 
AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId, assignmentSet);
+        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId);
+    assignableNode.assignNewBatch(assignmentSet);
     AssignableReplica duplicateReplica = new 
AssignableReplica(testCache.getClusterConfig(),
         testCache.getResourceConfig(_resourceNames.get(0)), 
_partitionNames.get(0), "SLAVE", 2);
     assignableNode.assign(duplicateReplica);
@@ -206,8 +207,7 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId);
   }
 
   @Test
@@ -227,8 +227,7 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     AssignableNode assignableNode = new 
AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/");
 
@@ -245,8 +244,7 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
   }
@@ -259,8 +257,7 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     InstanceConfig testInstanceConfig = new 
InstanceConfig("testInstanceConfigId");
 
     AssignableNode assignableNode =
-        new AssignableNode(testClusterConfig, testInstanceConfig, 
_testInstanceId,
-            Collections.emptyList());
+        new AssignableNode(testClusterConfig, testInstanceConfig, 
_testInstanceId);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
   }
 
@@ -274,7 +271,6 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     InstanceConfig testInstanceConfig = new 
InstanceConfig("testInstanceConfigId");
     testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
 
-    new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
-        Collections.emptyList());
+    new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
index a45b729..5112413 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -43,8 +43,7 @@ public class TestClusterModel extends 
AbstractTestClusterModel {
     Set<AssignableNode> nodeSet = new HashSet<>();
     testCache.getInstanceConfigMap().values().stream().forEach(config -> 
nodeSet.add(
         new AssignableNode(testCache.getClusterConfig(),
-            testCache.getInstanceConfigMap().get(_testInstanceId), 
config.getInstanceName(),
-            Collections.emptyList())));
+            testCache.getInstanceConfigMap().get(_testInstanceId), 
config.getInstanceName())));
     return nodeSet;
   }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 1ec92a9..ad608b6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,6 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.helix.HelixConstants;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -34,14 +42,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.when;
 
@@ -111,7 +111,18 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
     Assert.assertEquals(
         
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
             .collect(Collectors.toSet()), _instances);
-    // Shall have 2 resources and 12 replicas
+    // Shall have 2 resources and 4 replicas, since all nodes are in the same 
fault zone.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 4));
+
+    // Adjust instance fault zone, so they have different fault zones.
+    testCache.getInstanceConfigMap().values().stream()
+        .forEach(config -> config.setZoneId(config.getInstanceName()));
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, 
_resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        _instances, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap());
+    // Shall have 2 resources and 12 replicas after fault zone adjusted.
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
         .allMatch(replicaSet -> replicaSet.size() == 12));
@@ -197,10 +208,10 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
         _instances, 
Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
             Collections.singleton(changedResourceName)), 
Collections.emptyMap(),
         bestPossibleAssignment);
-    // There should be no existing assignment for all the resource except for 
resource2.
+    // There should be no existing assignment for all the resource except for 
resource2
     
Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(),
 1);
     Map<String, Set<String>> resourceAssignmentMap =
-        
clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testFaultZoneId);
+        
clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testInstanceId);
     // Should be only resource2 in the map
     Assert.assertEquals(resourceAssignmentMap.size(), 1);
     for (String resource : _resourceNames) {

Reply via email to