This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b3538320 WAGED rebalance overwrite redesign -- part 1 (#2444)
2b3538320 is described below

commit 2b35383208625eff0ca6730a2923393c955c649d
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Mon Apr 17 15:00:29 2023 -0400

    WAGED rebalance overwrite redesign -- part 1 (#2444)
    
    Implement new code path to handle "rebalance overwrites" that honors WAGED 
hard constraints
    The rebalance overwrite is a special mechanism to handle minActiveReplica 
during delayed rebalance. Additional replicas might be brought up in an 
instance that violates the WAGED hard constraint. This commit implements new 
code logic for it that honors the hard constraints. The new code is NOT yet 
integrated to existing flow.
---
 .../rebalancer/util/DelayedRebalanceUtil.java      | 173 +++++++++++-
 .../rebalancer/waged/model/AssignableReplica.java  |   8 +-
 .../waged/model/ClusterModelProvider.java          |  34 ++-
 .../waged/model/TestClusterModelProvider.java      | 311 +++++++++++++++++++++
 4 files changed, 522 insertions(+), 4 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 4f48b2f93..65b1431e4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -20,16 +20,23 @@ package org.apache.helix.controller.rebalancer.util;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.stream.Collectors;
 import org.apache.helix.HelixManager;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.util.InstanceValidationUtil;
 import org.slf4j.Logger;
@@ -42,7 +49,7 @@ import org.slf4j.LoggerFactory;
 public class DelayedRebalanceUtil {
   private static final Logger LOG = 
LoggerFactory.getLogger(DelayedRebalanceUtil.class);
 
-  private static RebalanceScheduler REBALANCE_SCHEDULER = new 
RebalanceScheduler();
+  private static final RebalanceScheduler REBALANCE_SCHEDULER = new 
RebalanceScheduler();
 
   /**
    * @return true if delay rebalance is configured and enabled in the 
ClusterConfig configurations.
@@ -279,4 +286,168 @@ public class DelayedRebalanceUtil {
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy 
minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * NOTE: This method also populates allocatedReplicas as it goes through all 
resources to preserve current allocation.
+   *
+   * @param clusterData Cluster data cache.
+   * @param resources A set all resource names.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, 
the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> 
findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Set<String> resources,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : resources) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          
ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = 
currentAssignment.get(resourceName);
+      String modelDef = 
clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = 
clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, 
state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new 
HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), 
clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, 
statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<String> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, 
Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, 
partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  /**
+   * From the current assignment, find the partitions that are missing 
minActiveReplica for ALL resources, return as a
+   * map keyed by resource name.
+   * @param clusterData Cluster data cache
+   * @param currentAssignment Current resource assignment
+   * @return <resource name, list<partition name>> that are missing 
minActiveReplica.
+   */
+  private static Map<String, List<String>> 
findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .map(e -> Map.entry(e.getKey(), 
findPartitionsMissingMinActiveReplica(clusterData, e.getValue())))
+        .filter(e -> !e.getValue().isEmpty())
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  /**
+   * From the current assignment, find the partitions that are missing 
minActiveReplica for SINGLE resource.
+   * @param clusterData Cluster data cache
+   * @param resourceAssignment Current resource assignment
+   * @return A list of partition names
+   */
+  private static List<String> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = 
currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = 
DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()
+        .parallelStream()
+        .filter(partition -> {
+          long enabledLivePlacementCounter = 
resourceAssignment.getReplicaMap(partition).keySet()
+              .stream()
+              .filter(enabledLiveInstances::contains)
+              .count();
+          return enabledLivePlacementCounter < Math.min(minActiveReplica, 
numReplica);
+        })
+        .map(Partition::getPartitionName)
+        .distinct()
+        .collect(Collectors.toList());
+  }
+
+  private static int getMinActiveReplica(ResourceControllerDataProvider 
clusterData, String resourceName) {
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = 
currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    return DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+  }
+
+  /**
+   * For the resource in the cluster, find additional AssignableReplica to 
close the gap on minActiveReplica.
+   * @param clusterData Cluster data cache.
+   * @param resourceName name of the resource
+   * @param partitions Pre-computed list of partition names missing 
minActiveReplica
+   * @param stateInstanceMap <partition, <state, instances set>>
+   * @param liveEnabledInstances A set of live and enabled instances
+   * @return A set of AssignableReplica
+   */
+  private static Set<AssignableReplica> findAssignableReplicaForResource(
+      ResourceControllerDataProvider clusterData,
+      String resourceName,
+      List<String> partitions,
+      Map<String, Map<String, Set<String>>> stateInstanceMap,
+      Set<String> liveEnabledInstances) {
+    LOG.info("Computing replicas requiring rebalance overwrite for resource: 
{}", resourceName);
+    final List<String> priorityOrderedStates =
+        
clusterData.getStateModelDef(clusterData.getIdealState(resourceName).getStateModelDefRef())
+            .getStatesPriorityList();
+    final IdealState currentIdealState = 
clusterData.getIdealState(resourceName);
+    final ResourceConfig resourceConfig = 
clusterData.getResourceConfig(resourceName);
+    final Map<String, Integer> statePriorityMap =
+        
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+    final Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String partitionName : partitions) {
+      // count current active replicas of the partition
+      Map<String, Integer> activeStateReplicaCount = 
stateInstanceMap.getOrDefault(partitionName, Collections.emptyMap())
+          .entrySet()
+          .stream()
+          .collect(Collectors.toMap(Map.Entry::getKey,
+              e -> (int) 
e.getValue().stream().filter(liveEnabledInstances::contains).count()));
+      int activeReplicas = 
activeStateReplicaCount.values().stream().reduce(Integer::sum).orElse(0);
+      int minActiveReplica = getMinActiveReplica(clusterData, resourceName);
+      int replicaGapCount = minActiveReplica - activeReplicas;
+      if (replicaGapCount <= 0) {
+        // delayed rebalance overwrites isn't required, early stop and move on 
to next partition
+        continue;
+      }
+      // follow the state priority order, add additional replicas to close the 
gap on replica count
+      Map<String, Integer> stateCountMap = 
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
+          .getStateCountMap(minActiveReplica, minActiveReplica);
+      // follow the priority order of states and prepare additional replicas 
to be assigned
+      for (String state : priorityOrderedStates) {
+        if (replicaGapCount <= 0) {
+          break;
+        }
+        int priority = statePriorityMap.get(state);
+        int curActiveStateCount = activeStateReplicaCount.getOrDefault(state, 
0);
+        for (int i = 0; i < stateCountMap.get(state) - curActiveStateCount && 
replicaGapCount > 0; i++) {
+          toBeAssignedReplicas.add(
+              new AssignableReplica(clusterData.getClusterConfig(), 
resourceConfig, partitionName, state, priority));
+          replicaGapCount--;
+        }
+      }
+    }
+    LOG.info("Replicas: {} need to be brought up for rebalance overwrite.", 
toBeAssignedReplicas);
+    return toBeAssignedReplicas;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 680d35b5b..4ddd1aef6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -34,6 +34,12 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This class represents a partition replication that needs to be allocated.
+ *
+ * TODO: This class is violating the contracts of {@link Object#hashCode()}
+ *   If two objects are equal according to the equals(Object) method, then 
calling the hashCode method on each of the
+ *   two objects must produce the same integer result.
+ *   https://github.com/apache/helix/issues/2299
+ *   This could be a tricky fix because this bug/feature is deeply coupled 
with the existing code logic
  */
 public class AssignableReplica implements Comparable<AssignableReplica> {
   private static final Logger LOG = 
LoggerFactory.getLogger(AssignableReplica.class);
@@ -56,7 +62,7 @@ public class AssignableReplica implements 
Comparable<AssignableReplica> {
    * @param replicaState   The state of the replication.
    * @param statePriority  The priority of the replication's state.
    */
-  AssignableReplica(ClusterConfig clusterConfig, ResourceConfig resourceConfig,
+  public AssignableReplica(ClusterConfig clusterConfig, ResourceConfig 
resourceConfig,
       String partitionName, String replicaState, int statePriority) {
     _partitionName = partitionName;
     _replicaState = replicaState;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index ce82c3207..7dab730e8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -53,7 +54,31 @@ public class ClusterModelProvider {
     // changes.
     GLOBAL_BASELINE,
     // Set the rebalance scope to cover only replicas that are assigned to 
downed instances.
-    EMERGENCY
+    EMERGENCY,
+    // A temporary overwrites for partition replicas on downed instance but 
still within the delayed window but missing
+    // minActiveReplicas
+    DELAYED_REBALANCE_OVERWRITES
+  }
+
+  /**
+   * TODO: On integration with WAGED, have to integrate with counter and 
latency metrics -- qqu
+   * Compute a new Cluster Model with scope limited to partitions with best 
possible assignment missing minActiveReplicas
+   * because of delayed rebalance setting.
+   * @param dataProvider The controller's data cache
+   * @param resourceMap The full map of the resource by name
+   * @param activeInstances The active instances that will be used in the 
calculation.
+   * @param resourceAssignment The resource assignment state to compute on. 
This should be the current state assignment;
+   *                           if it's run right after another rebalance 
calculation, the best possible assignment from
+   *                           previous result can be used.
+   * @return the ClusterModel
+   */
+  public static ClusterModel generateClusterModelForDelayedRebalanceOverwrites(
+      ResourceControllerDataProvider dataProvider,
+      Map<String, Resource> resourceMap,
+      Set<String> activeInstances,
+      Map<String, ResourceAssignment> resourceAssignment) {
+    return generateClusterModel(dataProvider, resourceMap, activeInstances, 
Collections.emptyMap(),
+        Collections.emptyMap(), resourceAssignment, 
RebalanceScopeType.DELAYED_REBALANCE_OVERWRITES);
   }
 
   /**
@@ -193,6 +218,11 @@ public class ClusterModelProvider {
         toBeAssignedReplicas = 
findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
             currentAssignment, allocatedReplicas);
         break;
+      case DELAYED_REBALANCE_OVERWRITES:
+        toBeAssignedReplicas =
+            
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(dataProvider, 
replicaMap.keySet(),
+                activeInstances, currentAssignment, allocatedReplicas);
+        break;
       default:
         throw new HelixException("Unknown rebalance scope type: " + scopeType);
     }
@@ -474,7 +504,7 @@ public class ClusterModelProvider {
   }
 
   // <partition, <state, instances set>>
-  private static Map<String, Map<String, Set<String>>> getStateInstanceMap(
+  public static Map<String, Map<String, Set<String>>> getStateInstanceMap(
       ResourceAssignment assignment) {
     if (assignment == null) {
       return Collections.emptyMap();
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 93f61f64e..fc316c2a5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,6 +19,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,7 +31,9 @@ import java.util.stream.Collectors;
 
 import org.apache.helix.HelixConstants;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -37,6 +41,8 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -47,6 +53,7 @@ import static org.mockito.Mockito.when;
 
 public class TestClusterModelProvider extends AbstractTestClusterModel {
   Set<String> _instances;
+  Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
 
   @BeforeClass
   public void initialize() {
@@ -94,6 +101,304 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
     return testCache;
   }
 
+  @Test
+  public void testFindToBeAssignedReplicasForMinActiveReplica() throws 
IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    String instance1 = _testInstanceId;
+    String offlineInstance = _testInstanceId + "1";
+    String instance2 = _testInstanceId + "2";
+    Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+    liveInstanceMap.put(instance1, createMockLiveInstance(instance1));
+    liveInstanceMap.put(instance2, createMockLiveInstance(instance2));
+    Set<String> activeInstances = new HashSet<>();
+    activeInstances.add(instance1);
+    activeInstances.add(instance2);
+    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+
+    // test 0, empty input
+    Assert.assertEquals(
+        
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, 
Collections.emptySet(),
+            activeInstances, Collections.emptyMap(), new HashMap<>()),
+        Collections.emptySet());
+
+    // test 1, one partition under minActiveReplica
+    Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
+        _resourceNames.get(0),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
+            _partitionNames.get(1), ImmutableMap.of("OFFLINE", 
offlineInstance)), // Partition2-MASTER
+        _resourceNames.get(1),
+        ImmutableMap.of(
+            _partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
+            _partitionNames.get(3), ImmutableMap.of("SLAVE", instance1))
+    );
+    Map<String, Set<AssignableReplica>> replicaMap = new HashMap<>(); // to 
populate
+    Map<String, ResourceAssignment> currentAssignment = new HashMap<>(); // to 
populate
+    prepareData(input, replicaMap, currentAssignment, testCache, 1);
+
+    Map<String, Set<AssignableReplica>> allocatedReplicas = new HashMap<>();
+    Set<AssignableReplica> toBeAssignedReplicas =
+        
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, 
replicaMap.keySet(), activeInstances,
+            currentAssignment, allocatedReplicas);
+
+    Assert.assertEquals(toBeAssignedReplicas.size(), 1);
+    
Assert.assertTrue(toBeAssignedReplicas.stream().map(AssignableReplica::toString).collect(Collectors.toSet())
+        .contains("Resource1-Partition2-MASTER"));
+    AssignableReplica replica = toBeAssignedReplicas.iterator().next();
+    Assert.assertEquals(replica.getReplicaState(), "MASTER");
+    Assert.assertEquals(replica.getPartitionName(), "Partition2");
+
+    // test 2, no additional replica to be assigned
+    testCache = setupClusterDataCache();
+    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    input = ImmutableMap.of(
+        _resourceNames.get(0),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
+            _partitionNames.get(1), ImmutableMap.of("SLAVE", instance2)),
+        _resourceNames.get(1),
+        ImmutableMap.of(
+            _partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
+            _partitionNames.get(3), ImmutableMap.of("SLAVE", instance2))
+    );
+    replicaMap = new HashMap<>(); // to populate
+    currentAssignment = new HashMap<>(); // to populate
+    prepareData(input, replicaMap, currentAssignment, testCache, 1);
+    allocatedReplicas = new HashMap<>();
+    toBeAssignedReplicas =
+        
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, 
replicaMap.keySet(), activeInstances,
+            currentAssignment, allocatedReplicas);
+    Assert.assertTrue(toBeAssignedReplicas.isEmpty());
+    Assert.assertEquals(allocatedReplicas.get(instance1).size(), 2);
+    Assert.assertEquals(allocatedReplicas.get(instance2).size(), 2);
+
+    // test 3, minActiveReplica==2, two partitions falling short
+    testCache = setupClusterDataCache();
+    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    input = ImmutableMap.of(
+        _resourceNames.get(0),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of("MASTER", instance1, 
"SLAVE", instance2),
+            _partitionNames.get(1), ImmutableMap.of("MASTER", instance1, 
"OFFLINE", offlineInstance)), // Partition2-SLAVE
+        _resourceNames.get(1),
+        ImmutableMap.of(
+            _partitionNames.get(2), ImmutableMap.of("MASTER", instance1, 
"SLAVE", instance2),
+            _partitionNames.get(3), ImmutableMap.of("SLAVE", instance1, 
"OFFLINE", offlineInstance)) // Partition4-MASTER
+    );
+    replicaMap = new HashMap<>(); // to populate
+    currentAssignment = new HashMap<>(); // to populate
+    prepareData(input, replicaMap, currentAssignment, testCache, 2);
+    allocatedReplicas = new HashMap<>();
+    toBeAssignedReplicas =
+        
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache, 
replicaMap.keySet(), activeInstances,
+            currentAssignment, allocatedReplicas);
+    Assert.assertEquals(toBeAssignedReplicas.size(), 2);
+    
Assert.assertEquals(toBeAssignedReplicas.stream().map(AssignableReplica::toString).collect(Collectors.toSet()),
+        ImmutableSet.of("Resource1-Partition2-SLAVE", 
"Resource2-Partition4-MASTER"));
+    Assert.assertEquals(allocatedReplicas.get(instance1).size(), 4);
+    Assert.assertEquals(allocatedReplicas.get(instance2).size(), 2);
+  }
+
+  @Test(dependsOnMethods = "testFindToBeAssignedReplicasForMinActiveReplica")
+  public void testClusterModelForDelayedRebalanceOverwrite() throws 
IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    String instance1 = _testInstanceId;
+    String offlineInstance = _testInstanceId + "1";
+    String instance2 = _testInstanceId + "2";
+    Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+    liveInstanceMap.put(instance1, createMockLiveInstance(instance1));
+    liveInstanceMap.put(instance2, createMockLiveInstance(instance2));
+    Set<String> activeInstances = new HashSet<>();
+    activeInstances.add(instance1);
+    activeInstances.add(instance2);
+    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+
+    // test 1, one partition under minActiveReplica
+    Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
+        _resourceNames.get(0),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
+            _partitionNames.get(1), ImmutableMap.of("OFFLINE", 
offlineInstance), // Partition2-MASTER
+            _partitionNames.get(2), ImmutableMap.of("MASTER", instance2),
+            _partitionNames.get(3), ImmutableMap.of("MASTER", instance2)),
+        _resourceNames.get(1),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of("MASTER", instance2),
+            _partitionNames.get(1), ImmutableMap.of("MASTER", instance2),
+            _partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
+            _partitionNames.get(3), ImmutableMap.of("OFFLINE", 
offlineInstance)) // Partition4-MASTER
+    );
+    Map<String, Set<AssignableReplica>> replicaMap = new HashMap<>(); // to 
populate
+    Map<String, ResourceAssignment> currentAssignment = new HashMap<>(); // to 
populate
+    prepareData(input, replicaMap, currentAssignment, testCache, 1);
+
+    Map<String, Resource> resourceMap = 
_resourceNames.stream().collect(Collectors.toMap(resource -> resource, 
Resource::new));
+    ClusterModel clusterModel = 
ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(testCache,
+        resourceMap, activeInstances, currentAssignment);
+    Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
+    
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance1));
+    
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance2));
+    
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance1).getAssignedReplicas().size(),
 2);
+    
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance2).getAssignedReplicas().size(),
 4);
+
+    
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource1").size(),
 1);
+    
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource1").iterator().next().toString(),
+        "Resource1-Partition2-MASTER");
+    
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource2").size(),
 1);
+    
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource2").iterator().next().toString(),
+        "Resource2-Partition4-MASTER");
+
+    // test 2, minActiveReplica==2, three partitions falling short
+    testCache = setupClusterDataCache();
+    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    input = ImmutableMap.of(
+        _resourceNames.get(0),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of("MASTER", instance1, 
"SLAVE", instance2),
+            _partitionNames.get(1), ImmutableMap.of("MASTER", instance1, 
"OFFLINE", offlineInstance), // Partition2-SLAVE
+            _partitionNames.get(2), ImmutableMap.of("OFFLINE", 
offlineInstance, "SLAVE", instance2), // Partition3-MASTER
+            _partitionNames.get(3), ImmutableMap.of("MASTER", instance1, 
"SLAVE", instance2)),
+        _resourceNames.get(1),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of("MASTER", instance1, 
"SLAVE", instance2),
+            _partitionNames.get(1), ImmutableMap.of("MASTER", instance1, 
"SLAVE", instance2),
+            _partitionNames.get(2), ImmutableMap.of("MASTER", instance1, 
"SLAVE", instance2),
+            _partitionNames.get(3), ImmutableMap.of("OFFLINE", 
offlineInstance, "ERROR", instance2)) // Partition4-MASTER
+    );
+    replicaMap = new HashMap<>(); // to populate
+    currentAssignment = new HashMap<>(); // to populate
+    prepareData(input, replicaMap, currentAssignment, testCache, 2);
+    clusterModel = 
ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(testCache,
+        resourceMap, activeInstances, currentAssignment);
+    Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
+    
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance1));
+    
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance2));
+    
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance1).getAssignedReplicas().size(),
 6);
+    
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance2).getAssignedReplicas().size(),
 7);
+
+    Set<String> replicaSet = 
clusterModel.getAssignableReplicaMap().get(_resourceNames.get(0))
+        .stream()
+        .map(AssignableReplica::toString)
+        .collect(Collectors.toSet());
+    Assert.assertEquals(replicaSet.size(), 2);
+    Assert.assertTrue(replicaSet.contains("Resource1-Partition2-SLAVE"));
+    Assert.assertTrue(replicaSet.contains("Resource1-Partition3-MASTER"));
+    replicaSet = 
clusterModel.getAssignableReplicaMap().get(_resourceNames.get(1))
+        .stream()
+        .map(AssignableReplica::toString)
+        .collect(Collectors.toSet());
+    Assert.assertEquals(replicaSet.size(), 1);
+    Assert.assertTrue(replicaSet.contains("Resource2-Partition4-MASTER"));
+  }
+
+  /**
+   * Prepare mock objects with given input. This methods prepare replicaMap 
and populate testCache with currentState.
+   *
+   * @param input <resource, <partition, <state, instance> > >
+   * @param replicaMap The data map to prepare, a set of AssignableReplica by 
resource name.
+   * @param currentAssignment The data map to prepare, resourceAssignment by 
resource name
+   * @param testCache The mock object to prepare
+   */
+  private void prepareData(Map<String, Map<String, Map<String, String>>> input,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Map<String, ResourceAssignment> currentAssignment,
+      ResourceControllerDataProvider testCache,
+      int minActiveReplica) {
+
+    // Set up mock idealstate
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      ResourceConfig resourceConfig = new ResourceConfig.Builder(resource)
+          .setMinActiveReplica(minActiveReplica)
+          .setNumReplica(3)
+          .build();
+      _resourceConfigMap.put(resource, resourceConfig);
+      IdealState is = new IdealState(resource);
+      is.setNumPartitions(_partitionNames.size());
+      is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      is.setStateModelDefRef("MasterSlave");
+      is.setReplicas("3");
+      is.setMinActiveReplicas(minActiveReplica);
+      is.setRebalancerClassName(WagedRebalancer.class.getName());
+      _partitionNames.forEach(partition -> is.setPreferenceList(partition, 
Collections.emptyList()));
+      isMap.put(resource, is);
+    }
+    when(testCache.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> 
isMap.get(invocationOnMock.getArguments()[0]));
+    when(testCache.getResourceConfig(anyString())).thenAnswer(
+        (Answer<ResourceConfig>) invocationOnMock -> 
_resourceConfigMap.get(invocationOnMock.getArguments()[0]));
+
+
+    // <instance, <resource, CurrentState>>
+    Map<String, Map<String, CurrentState>> currentStateByInstanceByResource = 
new HashMap<>();
+    Map<String, Map<String, Map<String, String>>> 
stateByInstanceByResourceByPartition = new HashMap<>();
+
+    for (String resource : input.keySet()) {
+      Set<AssignableReplica> replicas = new HashSet<>();
+      replicaMap.put(resource, replicas);
+      ResourceConfig resourceConfig = _resourceConfigMap.get(resource);
+      for (String partition : input.get(resource).keySet()) {
+        input.get(resource).get(partition).forEach(
+            (state, instance) -> {
+              stateByInstanceByResourceByPartition
+                  .computeIfAbsent(instance, k -> new HashMap<>())
+                  .computeIfAbsent(resource, k -> new HashMap<>())
+                  .put(partition, state);
+              replicas.add(new MockAssignableReplica(resourceConfig, 
partition, state));
+            });
+      }
+    }
+    for (String instance : stateByInstanceByResourceByPartition.keySet()) {
+      for (String resource : 
stateByInstanceByResourceByPartition.get(instance).keySet()) {
+        Map<String, String> partitionState = 
stateByInstanceByResourceByPartition.get(instance).get(resource);
+        CurrentState testCurrentStateResource = 
mockCurrentStateResource(partitionState);
+        currentStateByInstanceByResource.computeIfAbsent(instance, k -> new 
HashMap<>()).put(resource, testCurrentStateResource);
+      }
+    }
+
+    for (String instance : currentStateByInstanceByResource.keySet()) {
+      when(testCache.getCurrentState(instance, 
_sessionId)).thenReturn(currentStateByInstanceByResource.get(instance));
+      when(testCache.getCurrentState(instance, _sessionId, false))
+          .thenReturn(currentStateByInstanceByResource.get(instance));
+    }
+
+    // Mock a baseline assignment based on the current states.
+    for (String resource : _resourceNames) {
+      // <partition, <instance, state>>
+      Map<String, Map<String, String>> assignmentMap = new HashMap<>();
+      for (String instance : _instances) {
+        CurrentState cs = testCache.getCurrentState(instance, 
_sessionId).get(resource);
+        if (cs != null) {
+          for (Map.Entry<String, String> stateEntry : 
cs.getPartitionStateMap().entrySet()) {
+            assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new 
HashMap<>())
+                .put(instance, stateEntry.getValue());
+          }
+          ResourceAssignment assignment = new ResourceAssignment(resource);
+          assignmentMap.keySet().forEach(partition -> assignment
+              .addReplicaMap(new Partition(partition), 
assignmentMap.get(partition)));
+          currentAssignment.put(resource, assignment);
+        }
+      }
+    }
+  }
+
+  private CurrentState mockCurrentStateResource(Map<String, String> 
partitionState) {
+    CurrentState testCurrentStateResource = Mockito.mock(CurrentState.class);
+    
when(testCurrentStateResource.getResourceName()).thenReturn(_resourceNames.get(0));
+    
when(testCurrentStateResource.getPartitionStateMap()).thenReturn(partitionState);
+    
when(testCurrentStateResource.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(testCurrentStateResource.getSessionId()).thenReturn(_sessionId);
+    for (Map.Entry<String, String> entry : partitionState.entrySet()) {
+      
when(testCurrentStateResource.getState(entry.getKey())).thenReturn(entry.getValue());
+    }
+    return testCurrentStateResource;
+  }
+
   @Test
   public void testGenerateClusterModel() throws IOException {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
@@ -373,4 +678,10 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
     // No need to rebalance the replicas that are not in the baseline yet.
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
   }
+
+  static class MockAssignableReplica extends AssignableReplica {
+    MockAssignableReplica(ResourceConfig resourceConfig, String partition, 
String replicaState) {
+      super(new ClusterConfig("testCluster"), resourceConfig, partition, 
replicaState, 1);
+    }
+  }
 }


Reply via email to