This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9f385bff8c2237459e492048d9924204cbaf8899
Author: Jiajun Wang <[email protected]>
AuthorDate: Tue Aug 13 11:25:23 2019 -0700

    Implement Cluster Model Provider. (#392)
    
    * Implement Cluster Model Provider.
    
    The model provider is called in the WAGED rebalancer to generate CLuster 
Model based on the current cluster status.
    The major responsibility of the provider is to parse all the assignable 
replicas and identify which replicas need to be reassigned. Note that if the 
current best possible assignment is still valid, the rebalancer won't need to 
calculate for the partition assignment.
    
    Also, add unit tests to verify the main logic.
---
 .../rebalancer/waged/ClusterDataDetector.java      |   3 +-
 .../rebalancer/waged/model/AssignableNode.java     |   2 +-
 .../rebalancer/waged/model/ClusterModel.java       |   1 -
 .../waged/model/ClusterModelProvider.java          | 247 +++++++++++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  29 ++-
 .../waged/model/TestClusterModelProvider.java      | 247 +++++++++++++++++++++
 6 files changed, 517 insertions(+), 12 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
index 07f16dd..0423edf 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
@@ -35,11 +35,12 @@ public class ClusterDataDetector<T extends 
BaseControllerDataProvider> {
    * All the cluster change type that may trigger a WAGED rebalancer 
re-calculation.
    */
   public enum ChangeType {
+    BaselineAssignmentChange,
     InstanceConfigChange,
     ClusterConfigChange,
     ResourceConfigChange,
-    InstanceStateChange,
     ResourceIdealStatesChange,
+    InstanceStateChange,
     OtherChange
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 989323e..5fc04d7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -52,7 +52,7 @@ public class AssignableNode {
   private int _maxPartition; // maximum number of the partitions that can be 
assigned to the node.
 
   // proposed assignment tracking
-  // <resource name, partition name>
+  // <resource name, partition name set>
   private Map<String, Set<String>> _currentAssignments;
   // <resource name, top state partition name>
   private Map<String, Set<String>> _currentTopStateAssignments;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 1be527a..6c4e67b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
  */
 
 import org.apache.helix.HelixException;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.Collections;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
new file mode 100644
index 0000000..9de023b
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -0,0 +1,247 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This util class generates Cluster Model object based on the controller's 
data cache.
+ */
+public class ClusterModelProvider {
+
+  /**
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be 
rebalanced. Note that any
+   *                               resources that are not in this list will be 
removed from the
+   *                               final assignment.
+   * @param activeInstances        The active instances that will be used in 
the calculation.
+   *                               Note this list can be different from the 
real active node list
+   *                               according to the rebalancer logic.
+   * @param clusterChanges         All the cluster changes that happened after 
the previous rebalance.
+   * @param baselineAssignment     The persisted Baseline assignment.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that 
was generated in the
+   *                               previous rebalance.
+   * @return Generate a new Cluster Model object according to the current 
cluster status.
+   */
+  public static ClusterModel 
generateClusterModel(ResourceControllerDataProvider dataProvider,
+      Map<String, Resource> resourceMap, Set<String> activeInstances,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
+      Map<String, ResourceAssignment> baselineAssignment,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // Generate replica objects for all the resource partitions.
+    // <resource, replica set>
+    Map<String, Set<AssignableReplica>> replicaMap =
+        parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
+
+    // Check if the replicas need to be reassigned.
+    Map<String, Set<AssignableReplica>> allocatedReplicas =
+        new HashMap<>(); // <instanceName, replica set>
+    Set<AssignableReplica> toBeAssignedReplicas =
+        findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
+            bestPossibleAssignment, allocatedReplicas);
+
+    // Construct all the assignable nodes and initialize with the allocated 
replicas.
+    Set<AssignableNode> assignableNodes =
+        parseAllNodes(dataProvider.getClusterConfig(), 
dataProvider.getInstanceConfigMap(),
+            activeInstances, allocatedReplicas);
+
+    // Construct and initialize cluster context.
+    ClusterContext context = new ClusterContext(
+        
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
+        activeInstances.size());
+    // Initial the cluster context with the allocated assignments.
+    
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
+
+    return new ClusterModel(context, toBeAssignedReplicas, assignableNodes, 
baselineAssignment,
+        bestPossibleAssignment);
+  }
+
+  /**
+   * Find the minimum set of replicas that need to be reassigned.
+   * A replica needs to be reassigned if one of the following condition is 
true:
+   * 1. Cluster topology (the cluster config / any instance config) has been 
updated.
+   * 2. The baseline assignment has been updated.
+   * 3. The resource config has been updated.
+   * 4. The resource idealstate has been updated. TODO remove this condition 
when all resource configurations are migrated to resource config.
+   * 5. If the current best possible assignment does not contain the 
partition's valid assignment.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by 
resource name.
+   * @param clusterChanges         A map contains all the important metadata 
updates that happened after the previous rebalance.
+   * @param activeInstances        All the instances that are alive and 
enabled.
+   * @param bestPossibleAssignment The current best possible assignment.
+   * @param allocatedReplicas      Return the allocated replicas grouped by 
the target instance name.
+   * @return The replicas that need to be reassigned.
+   */
+  private static Set<AssignableReplica> findToBeAssignedReplicas(
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, 
Set<String> activeInstances,
+      Map<String, ResourceAssignment> bestPossibleAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    if 
(clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange)
+        || 
clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange)
+        || 
clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange))
 {
+      // If the cluster topology or baseline assignment has been modified, 
need to reassign all replicas
+      toBeAssignedReplicas
+          
.addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
+    } else {
+      // check each resource to identify the allocated replicas and 
to-be-assigned replicas.
+      for (String resourceName : replicaMap.keySet()) {
+        Set<AssignableReplica> replicas = replicaMap.get(resourceName);
+        // 1. if the resource config/idealstate is changed, need to reassign.
+        // 2. if the resource does appear in the best possible assignment, 
need to reassign.
+        if 
(clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange,
+            Collections.emptySet()).contains(resourceName) || clusterChanges
+            
.getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange,
+                Collections.emptySet()).contains(resourceName) || 
!bestPossibleAssignment
+            .containsKey(resourceName)) {
+          toBeAssignedReplicas.addAll(replicas);
+          continue; // go to check next resource
+        } else {
+          // check for every best possible assignments to identify if the 
related replicas need to reassign.
+          ResourceAssignment assignment = 
bestPossibleAssignment.get(resourceName);
+          // <partition, <instance, state>>
+          Map<String, Map<String, String>> stateMap = 
assignment.getMappedPartitions().stream()
+              .collect(Collectors.toMap(partition -> 
partition.getPartitionName(),
+                  partition -> new 
HashMap<>(assignment.getReplicaMap(partition))));
+          for (AssignableReplica replica : replicas) {
+            // Find any ACTIVE instance allocation that has the same state 
with the replica
+            Optional<Map.Entry<String, String>> instanceNameOptional =
+                stateMap.getOrDefault(replica.getPartitionName(), 
Collections.emptyMap()).entrySet()
+                    .stream().filter(instanceStateMap ->
+                    
instanceStateMap.getValue().equals(replica.getReplicaState()) && activeInstances
+                        .contains(instanceStateMap.getKey())).findAny();
+            // 3. if no such an instance in the bestPossible assignment, need 
to reassign the replica
+            if (!instanceNameOptional.isPresent()) {
+              toBeAssignedReplicas.add(replica);
+              continue; // go to check the next replica
+            } else {
+              String instanceName = instanceNameOptional.get().getKey();
+              // * cleanup the best possible state map record,
+              // * so the selected instance won't be picked up again for the 
another replica check
+              stateMap.getOrDefault(replica.getPartitionName(), 
Collections.emptyMap())
+                  .remove(instanceName);
+              // the current best possible assignment for this replica is 
valid,
+              // add to the allocated replica list.
+              allocatedReplicas.computeIfAbsent(instanceName, key -> new 
HashSet<>()).add(replica);
+            }
+          }
+        }
+      }
+    }
+    return toBeAssignedReplicas;
+  }
+
+  /**
+   * Parse all the nodes that can be assigned replicas based on the 
configurations.
+   *
+   * @param clusterConfig     The cluster configuration.
+   * @param instanceConfigMap A map of all the instance configuration.
+   * @param activeInstances   All the instances that are online and enabled.
+   * @param allocatedReplicas A map of all the assigned replicas, which will 
not be reassigned during the rebalance.
+   * @return A map of assignable node set, <InstanceName, node set>.
+   */
+  private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
+      Map<String, InstanceConfig> instanceConfigMap, Set<String> 
activeInstances,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    return activeInstances.stream().map(
+        instanceName -> new AssignableNode(clusterConfig, 
instanceConfigMap.get(instanceName),
+            instanceName, allocatedReplicas.getOrDefault(instanceName, 
Collections.emptySet())))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * Parse all the replicas that need to be reallocated from the cluster data 
cache.
+   *
+   * @param dataProvider The cluster status cache that contains the current 
cluster status.
+   * @param resourceMap  All the valid resources that are managed by the 
rebalancer.
+   * @return A map of assignable replica set, <ResourceName, replica set>.
+   */
+  private static Map<String, Set<AssignableReplica>> parseAllReplicas(
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> 
resourceMap,
+      int instanceCount) {
+    Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
+
+    for (String resourceName : resourceMap.keySet()) {
+      ResourceConfig config = dataProvider.getResourceConfig(resourceName);
+      IdealState is = dataProvider.getIdealState(resourceName);
+      if (is == null) {
+        throw new HelixException(
+            "Cannot find the resource ideal state for resource: " + 
resourceName);
+      }
+      String defName = is.getStateModelDefRef();
+      StateModelDefinition def = dataProvider.getStateModelDef(defName);
+      if (def == null) {
+        throw new IllegalArgumentException(String
+            .format("Cannot find state model definition %s for resource %s.",
+                is.getStateModelDefRef(), resourceName));
+      }
+
+      Map<String, Integer> stateCountMap =
+          def.getStateCountMap(instanceCount, 
is.getReplicaCount(instanceCount));
+
+      for (String partition : is.getPartitionSet()) {
+        for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
+          String state = entry.getKey();
+          for (int i = 0; i < entry.getValue(); i++) {
+            totalReplicaMap.computeIfAbsent(resourceName, key -> new 
HashSet<>()).add(
+                new AssignableReplica(config, partition, state,
+                    def.getStatePriorityMap().get(state)));
+          }
+        }
+      }
+    }
+    return totalReplicaMap;
+  }
+
+  /**
+   * @return A map contains the assignments for each fault zone. <fault zone, 
<resource, set of partitions>>
+   */
+  private static Map<String, Map<String, Set<String>>> 
mapAssignmentToFaultZone(
+      Set<AssignableNode> assignableNodes) {
+    Map<String, Map<String, Set<String>>> faultZoneAssignmentMap = new 
HashMap<>();
+    assignableNodes.stream().forEach(node -> {
+      for (Map.Entry<String, Set<String>> resourceMap : 
node.getCurrentAssignmentsMap()
+          .entrySet()) {
+        faultZoneAssignmentMap.computeIfAbsent(node.getFaultZone(), k -> new 
HashMap<>())
+            .computeIfAbsent(resourceMap.getKey(), k -> new HashSet<>())
+            .addAll(resourceMap.getValue());
+      }
+    });
+    return faultZoneAssignmentMap;
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 0e2b43a..d99a3fb 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -41,6 +41,7 @@ import java.util.Set;
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
+  protected static String _sessionId = "testSessionId";
   protected String _testInstanceId;
   protected List<String> _resourceNames;
   protected List<String> _partitionNames;
@@ -73,16 +74,27 @@ public abstract class AbstractTestClusterModel {
     _testFaultZoneId = "testZone";
   }
 
+  InstanceConfig createMockInstanceConfig(String instanceId) {
+    InstanceConfig testInstanceConfig = new InstanceConfig(instanceId);
+    testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
+    testInstanceConfig.addTag(_testInstanceTags.get(0));
+    testInstanceConfig.setInstanceEnabled(true);
+    testInstanceConfig.setZoneId(_testFaultZoneId);
+    return testInstanceConfig;
+  }
+
+  LiveInstance createMockLiveInstance(String instanceId) {
+    LiveInstance testLiveInstance = new LiveInstance(instanceId);
+    testLiveInstance.setSessionId(_sessionId);
+    return testLiveInstance;
+  }
+
   protected ResourceControllerDataProvider setupClusterDataCache() throws 
IOException {
     ResourceControllerDataProvider testCache = 
Mockito.mock(ResourceControllerDataProvider.class);
 
     // 1. Set up the default instance information with capacity configuration.
-    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceId");
-    testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
-    testInstanceConfig.addTag(_testInstanceTags.get(0));
+    InstanceConfig testInstanceConfig = 
createMockInstanceConfig(_testInstanceId);
     testInstanceConfig.setInstanceEnabledForPartition("TestResource", 
"TestPartition", false);
-    testInstanceConfig.setInstanceEnabled(true);
-    testInstanceConfig.setZoneId(_testFaultZoneId);
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
@@ -95,8 +107,7 @@ public abstract class AbstractTestClusterModel {
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
 
     // 3. Mock the live instance node for the default instance.
-    LiveInstance testLiveInstance = new LiveInstance(_testInstanceId);
-    testLiveInstance.setSessionId("testSessionId");
+    LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
     Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
     liveInstanceMap.put(_testInstanceId, testLiveInstance);
     when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
@@ -130,7 +141,7 @@ public abstract class AbstractTestClusterModel {
     Map<String, CurrentState> currentStatemap = new HashMap<>();
     currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
     currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);
-    when(testCache.getCurrentState(_testInstanceId, 
"testSessionId")).thenReturn(currentStatemap);
+    when(testCache.getCurrentState(_testInstanceId, 
_sessionId)).thenReturn(currentStatemap);
 
     // 5. Set up the resource config for the two resources with the partition 
weight.
     Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
@@ -162,7 +173,7 @@ public abstract class AbstractTestClusterModel {
   protected Set<AssignableReplica> 
generateReplicas(ResourceControllerDataProvider dataProvider) {
     // Create assignable replica based on the current state.
     Map<String, CurrentState> currentStatemap =
-        dataProvider.getCurrentState(_testInstanceId, "testSessionId");
+        dataProvider.getCurrentState(_testInstanceId, _sessionId);
     Set<AssignableReplica> assignmentSet = new HashSet<>();
     for (CurrentState cs : currentStatemap.values()) {
       ResourceConfig resourceConfig = 
dataProvider.getResourceConfig(cs.getResourceName());
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
new file mode 100644
index 0000000..f92a66c
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -0,0 +1,247 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+public class TestClusterModelProvider extends AbstractTestClusterModel {
+  Set<String> _instances;
+
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+    _instances = new HashSet<>();
+    _instances.add(_testInstanceId);
+  }
+
+  @Override
+  protected ResourceControllerDataProvider setupClusterDataCache() throws 
IOException {
+    ResourceControllerDataProvider testCache = super.setupClusterDataCache();
+
+    // Set up mock idealstate
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState is = new IdealState(resource);
+      is.setNumPartitions(_partitionNames.size());
+      is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      is.setStateModelDefRef("MasterSlave");
+      is.setReplicas("3");
+      is.setRebalancerClassName(WagedRebalancer.class.getName());
+      _partitionNames.stream()
+          .forEach(partition -> is.setPreferenceList(partition, 
Collections.emptyList()));
+      isMap.put(resource, is);
+    }
+    when(testCache.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> 
isMap.get(invocationOnMock.getArguments()[0]));
+
+    // Set up 2 more instances
+    for (int i = 1; i < 3; i++) {
+      String instanceName = _testInstanceId + i;
+      _instances.add(instanceName);
+      // 1. Set up the default instance information with capacity 
configuration.
+      InstanceConfig testInstanceConfig = 
createMockInstanceConfig(instanceName);
+      Map<String, InstanceConfig> instanceConfigMap = 
testCache.getInstanceConfigMap();
+      instanceConfigMap.put(instanceName, testInstanceConfig);
+      when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+      // 2. Mock the live instance node for the default instance.
+      LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
+      Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+      liveInstanceMap.put(instanceName, testLiveInstance);
+      when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    }
+
+    return testCache;
+  }
+
+  @Test
+  public void testGenerateClusterModel() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    // 1. test generating a cluster model with empty assignment
+    ClusterModel clusterModel = 
ClusterModelProvider.generateClusterModel(testCache,
+        _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        _instances, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap());
+    // There should be no existing assignment.
+    
Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .anyMatch(resourceMap -> !resourceMap.isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+    // Have all 3 instances
+    Assert.assertEquals(
+        
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
+            .collect(Collectors.toSet()), _instances);
+    // Shall have 2 resources and 12 replicas
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 12));
+
+    // 2. test with only one active node
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, 
_resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        Collections.singleton(_testInstanceId), Collections.emptyMap(), 
Collections.emptyMap(),
+        Collections.emptyMap());
+    // Have only one instance
+    Assert.assertEquals(
+        
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
+            .collect(Collectors.toSet()), 
Collections.singleton(_testInstanceId));
+    // Shall have 4 assignable replicas because there is only one valid node.
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 4));
+
+    // 3. test with no active instance
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, 
_resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
+        Collections.emptyMap());
+    // Have only one instance
+    Assert.assertEquals(clusterModel.getAssignableNodes().size(), 0);
+    // Shall have 0 assignable replicas because there is only n0 valid node.
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.isEmpty()));
+
+    // 4. test with best possible assignment
+    // Mock a best possible assignment based on the current states.
+    Map<String, ResourceAssignment> bestPossibleAssignment = new HashMap<>();
+    for (String resource : _resourceNames) {
+      // <partition, <instance, state>>
+      Map<String, Map<String, String>> assignmentMap = new HashMap<>();
+      CurrentState cs = testCache.getCurrentState(_testInstanceId, 
_sessionId).get(resource);
+      if (cs != null) {
+        for (Map.Entry<String, String> stateEntry : 
cs.getPartitionStateMap().entrySet()) {
+          assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new 
HashMap<>())
+              .put(_testInstanceId, stateEntry.getValue());
+        }
+        ResourceAssignment assignment = new ResourceAssignment(resource);
+        assignmentMap.keySet().stream().forEach(partition -> assignment
+            .addReplicaMap(new Partition(partition), 
assignmentMap.get(partition)));
+        bestPossibleAssignment.put(resource, assignment);
+      }
+    }
+
+    // Generate a cluster model based on the best possible assignment
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, 
_resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        _instances, Collections.emptyMap(), Collections.emptyMap(), 
bestPossibleAssignment);
+    // There should be 4 existing assignments in total (each resource has 2) 
in the specified instance
+    
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.values().stream()
+            .allMatch(partitionSet -> partitionSet.size() == 2)));
+    Assert.assertEquals(
+        
clusterModel.getAssignableNodes().get(_testInstanceId).getCurrentAssignmentCount(),
 4);
+    // Since each resource has 2 replicas assigned, the assignable replica 
count should be 10.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 10));
+
+    // 5. test with best possible assignment but cluster topology is changed
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, 
_resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        _instances, 
Collections.singletonMap(ClusterDataDetector.ChangeType.ClusterConfigChange,
+            Collections.emptySet()), Collections.emptyMap(), 
bestPossibleAssignment);
+    // There should be no existing assignment since the topology change 
invalidates all existing assignment
+    
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+    // Shall have 2 resources and 12 replicas
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 12));
+
+    // 6. test with best possible assignment and one resource config change
+    // Generate a cluster model based on the same best possible assignment, 
but resource1 config is changed
+    String changedResourceName = _resourceNames.get(0);
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, 
_resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        _instances, 
Collections.singletonMap(ClusterDataDetector.ChangeType.ResourceConfigChange,
+            Collections.singleton(changedResourceName)), 
Collections.emptyMap(),
+        bestPossibleAssignment);
+    // There should be no existing assignment for all the resource except for 
resource2.
+    
Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(),
 1);
+    Map<String, Set<String>> resourceAssignmentMap =
+        
clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testFaultZoneId);
+    // Should be only resource2 in the map
+    Assert.assertEquals(resourceAssignmentMap.size(), 1);
+    for (String resource : _resourceNames) {
+      Assert
+          .assertEquals(resourceAssignmentMap.getOrDefault(resource, 
Collections.emptySet()).size(),
+              resource.equals(changedResourceName) ? 0 : 2);
+    }
+    // Only the first instance will have 2 assignment from resource2.
+    for (String instance : _instances) {
+      Assert
+          
.assertEquals(clusterModel.getAssignableNodes().get(instance).getCurrentAssignmentCount(),
+              instance.equals(_testInstanceId) ? 2 : 0);
+    }
+    // Shall have 2 resources and 12 replicas
+    
Assert.assertEquals(clusterModel.getAssignableReplicaMap().keySet().size(), 2);
+    for (String resource : _resourceNames) {
+      
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get(resource).size(),
+          resource.equals(changedResourceName) ? 12 : 10);
+    }
+
+    // 7. test with best possible assignment but the instance becomes inactive
+    // Generate a cluster model based on the best possible assignment, but the 
assigned node is disabled
+    Set<String> limitedActiveInstances = new HashSet<>(_instances);
+    limitedActiveInstances.remove(_testInstanceId);
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, 
_resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new 
Resource(resource))),
+        limitedActiveInstances, Collections.emptyMap(), Collections.emptyMap(),
+        bestPossibleAssignment);
+    // There should be no existing assignment.
+    
Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .anyMatch(resourceMap -> !resourceMap.isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+    // Have only 2 instances
+    Assert.assertEquals(
+        
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
+            .collect(Collectors.toSet()), limitedActiveInstances);
+    // Since only 2 instances are active, we shall have 8 assignable replicas 
in each resource.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 8));
+
+  }
+}

Reply via email to