This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 2b3538320 WAGED rebalance overwrite redesign -- part 1 (#2444)
2b3538320 is described below
commit 2b35383208625eff0ca6730a2923393c955c649d
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Mon Apr 17 15:00:29 2023 -0400
WAGED rebalance overwrite redesign -- part 1 (#2444)
Implement new code path to handle "rebalance overwrites" that honors WAGED
hard constraints
The rebalance overwrite is a special mechanism to handle minActiveReplica
during delayed rebalance. Additional replicas might be brought up in an
instance that violates the WAGED hard constraint. This commit implements new
code logic for it that honors the hard constraints. The new code is NOT yet
integrated to existing flow.
---
.../rebalancer/util/DelayedRebalanceUtil.java | 173 +++++++++++-
.../rebalancer/waged/model/AssignableReplica.java | 8 +-
.../waged/model/ClusterModelProvider.java | 34 ++-
.../waged/model/TestClusterModelProvider.java | 311 +++++++++++++++++++++
4 files changed, 522 insertions(+), 4 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 4f48b2f93..65b1431e4 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -20,16 +20,23 @@ package org.apache.helix.controller.rebalancer.util;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.helix.HelixManager;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
@@ -42,7 +49,7 @@ import org.slf4j.LoggerFactory;
public class DelayedRebalanceUtil {
private static final Logger LOG =
LoggerFactory.getLogger(DelayedRebalanceUtil.class);
- private static RebalanceScheduler REBALANCE_SCHEDULER = new
RebalanceScheduler();
+ private static final RebalanceScheduler REBALANCE_SCHEDULER = new
RebalanceScheduler();
/**
* @return true if delay rebalance is configured and enabled in the
ClusterConfig configurations.
@@ -279,4 +286,168 @@ public class DelayedRebalanceUtil {
}
}
}
+
+ /**
+ * Computes the partition replicas that needs to be brought up to satisfy
minActiveReplicas while downed instances
+ * are within the delayed window.
+ * Keep all current assignment with their current allocation.
+ * NOTE: This method also populates allocatedReplicas as it goes through all
resources to preserve current allocation.
+ *
+ * @param clusterData Cluster data cache.
+ * @param resources A set all resource names.
+ * @param liveEnabledInstances The set of live and enabled instances.
+ * @param currentAssignment Current assignment by resource name.
+ * @param allocatedReplicas The map from instance name to assigned replicas,
the map is populated in this method.
+ * @return The replicas that need to be assigned.
+ */
+ public static Set<AssignableReplica>
findToBeAssignedReplicasForMinActiveReplica(
+ ResourceControllerDataProvider clusterData,
+ Set<String> resources,
+ Set<String> liveEnabledInstances,
+ Map<String, ResourceAssignment> currentAssignment,
+ Map<String, Set<AssignableReplica>> allocatedReplicas) {
+ Map<String, List<String>> partitionsMissingMinActiveReplicas =
+ findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+ Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+ for (String resourceName : resources) {
+ // <partition, <state, instances set>>
+ Map<String, Map<String, Set<String>>> stateInstanceMap =
+
ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+ ResourceAssignment resourceAssignment =
currentAssignment.get(resourceName);
+ String modelDef =
clusterData.getIdealState(resourceName).getStateModelDefRef();
+ Map<String, Integer> statePriorityMap =
clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+ // keep all current assignment and add to allocated replicas
+ resourceAssignment.getMappedPartitions().forEach(partition ->
+ resourceAssignment.getReplicaMap(partition).forEach((instance,
state) ->
+ allocatedReplicas.computeIfAbsent(instance, key -> new
HashSet<>())
+ .add(new AssignableReplica(clusterData.getClusterConfig(),
clusterData.getResourceConfig(resourceName),
+ partition.getPartitionName(), state,
statePriorityMap.get(state)))));
+ // only proceed for resource requiring delayed rebalance overwrites
+ List<String> partitions =
+ partitionsMissingMinActiveReplicas.getOrDefault(resourceName,
Collections.emptyList());
+ if (partitions.isEmpty()) {
+ continue;
+ }
+ toBeAssignedReplicas.addAll(
+ findAssignableReplicaForResource(clusterData, resourceName,
partitions, stateInstanceMap, liveEnabledInstances));
+ }
+ return toBeAssignedReplicas;
+ }
+
+ /**
+ * From the current assignment, find the partitions that are missing
minActiveReplica for ALL resources, return as a
+ * map keyed by resource name.
+ * @param clusterData Cluster data cache
+ * @param currentAssignment Current resource assignment
+ * @return <resource name, list<partition name>> that are missing
minActiveReplica.
+ */
+ private static Map<String, List<String>>
findPartitionsMissingMinActiveReplica(
+ ResourceControllerDataProvider clusterData,
+ Map<String, ResourceAssignment> currentAssignment) {
+ return currentAssignment.entrySet()
+ .parallelStream()
+ .map(e -> Map.entry(e.getKey(),
findPartitionsMissingMinActiveReplica(clusterData, e.getValue())))
+ .filter(e -> !e.getValue().isEmpty())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
+ * From the current assignment, find the partitions that are missing
minActiveReplica for SINGLE resource.
+ * @param clusterData Cluster data cache
+ * @param resourceAssignment Current resource assignment
+ * @return A list of partition names
+ */
+ private static List<String> findPartitionsMissingMinActiveReplica(
+ ResourceControllerDataProvider clusterData,
+ ResourceAssignment resourceAssignment) {
+ String resourceName = resourceAssignment.getResourceName();
+ IdealState currentIdealState = clusterData.getIdealState(resourceName);
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ int numReplica =
currentIdealState.getReplicaCount(enabledLiveInstances.size());
+ int minActiveReplica =
DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+ currentIdealState), currentIdealState, numReplica);
+ return resourceAssignment.getMappedPartitions()
+ .parallelStream()
+ .filter(partition -> {
+ long enabledLivePlacementCounter =
resourceAssignment.getReplicaMap(partition).keySet()
+ .stream()
+ .filter(enabledLiveInstances::contains)
+ .count();
+ return enabledLivePlacementCounter < Math.min(minActiveReplica,
numReplica);
+ })
+ .map(Partition::getPartitionName)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ private static int getMinActiveReplica(ResourceControllerDataProvider
clusterData, String resourceName) {
+ IdealState currentIdealState = clusterData.getIdealState(resourceName);
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ int numReplica =
currentIdealState.getReplicaCount(enabledLiveInstances.size());
+ return DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+ currentIdealState), currentIdealState, numReplica);
+ }
+
+ /**
+ * For the resource in the cluster, find additional AssignableReplica to
close the gap on minActiveReplica.
+ * @param clusterData Cluster data cache.
+ * @param resourceName name of the resource
+ * @param partitions Pre-computed list of partition names missing
minActiveReplica
+ * @param stateInstanceMap <partition, <state, instances set>>
+ * @param liveEnabledInstances A set of live and enabled instances
+ * @return A set of AssignableReplica
+ */
+ private static Set<AssignableReplica> findAssignableReplicaForResource(
+ ResourceControllerDataProvider clusterData,
+ String resourceName,
+ List<String> partitions,
+ Map<String, Map<String, Set<String>>> stateInstanceMap,
+ Set<String> liveEnabledInstances) {
+ LOG.info("Computing replicas requiring rebalance overwrite for resource:
{}", resourceName);
+ final List<String> priorityOrderedStates =
+
clusterData.getStateModelDef(clusterData.getIdealState(resourceName).getStateModelDefRef())
+ .getStatesPriorityList();
+ final IdealState currentIdealState =
clusterData.getIdealState(resourceName);
+ final ResourceConfig resourceConfig =
clusterData.getResourceConfig(resourceName);
+ final Map<String, Integer> statePriorityMap =
+
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+ final Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+ for (String partitionName : partitions) {
+ // count current active replicas of the partition
+ Map<String, Integer> activeStateReplicaCount =
stateInstanceMap.getOrDefault(partitionName, Collections.emptyMap())
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> (int)
e.getValue().stream().filter(liveEnabledInstances::contains).count()));
+ int activeReplicas =
activeStateReplicaCount.values().stream().reduce(Integer::sum).orElse(0);
+ int minActiveReplica = getMinActiveReplica(clusterData, resourceName);
+ int replicaGapCount = minActiveReplica - activeReplicas;
+ if (replicaGapCount <= 0) {
+ // delayed rebalance overwrites isn't required, early stop and move on
to next partition
+ continue;
+ }
+ // follow the state priority order, add additional replicas to close the
gap on replica count
+ Map<String, Integer> stateCountMap =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
+ .getStateCountMap(minActiveReplica, minActiveReplica);
+ // follow the priority order of states and prepare additional replicas
to be assigned
+ for (String state : priorityOrderedStates) {
+ if (replicaGapCount <= 0) {
+ break;
+ }
+ int priority = statePriorityMap.get(state);
+ int curActiveStateCount = activeStateReplicaCount.getOrDefault(state,
0);
+ for (int i = 0; i < stateCountMap.get(state) - curActiveStateCount &&
replicaGapCount > 0; i++) {
+ toBeAssignedReplicas.add(
+ new AssignableReplica(clusterData.getClusterConfig(),
resourceConfig, partitionName, state, priority));
+ replicaGapCount--;
+ }
+ }
+ }
+ LOG.info("Replicas: {} need to be brought up for rebalance overwrite.",
toBeAssignedReplicas);
+ return toBeAssignedReplicas;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 680d35b5b..4ddd1aef6 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -34,6 +34,12 @@ import org.slf4j.LoggerFactory;
/**
* This class represents a partition replication that needs to be allocated.
+ *
+ * TODO: This class is violating the contracts of {@link Object#hashCode()}
+ * If two objects are equal according to the equals(Object) method, then
calling the hashCode method on each of the
+ * two objects must produce the same integer result.
+ * https://github.com/apache/helix/issues/2299
+ * This could be a tricky fix because this bug/feature is deeply coupled
with the existing code logic
*/
public class AssignableReplica implements Comparable<AssignableReplica> {
private static final Logger LOG =
LoggerFactory.getLogger(AssignableReplica.class);
@@ -56,7 +62,7 @@ public class AssignableReplica implements
Comparable<AssignableReplica> {
* @param replicaState The state of the replication.
* @param statePriority The priority of the replication's state.
*/
- AssignableReplica(ClusterConfig clusterConfig, ResourceConfig resourceConfig,
+ public AssignableReplica(ClusterConfig clusterConfig, ResourceConfig
resourceConfig,
String partitionName, String replicaState, int statePriority) {
_partitionName = partitionName;
_replicaState = replicaState;
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index ce82c3207..7dab730e8 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -53,7 +54,31 @@ public class ClusterModelProvider {
// changes.
GLOBAL_BASELINE,
// Set the rebalance scope to cover only replicas that are assigned to
downed instances.
- EMERGENCY
+ EMERGENCY,
+ // A temporary overwrites for partition replicas on downed instance but
still within the delayed window but missing
+ // minActiveReplicas
+ DELAYED_REBALANCE_OVERWRITES
+ }
+
+ /**
+ * TODO: On integration with WAGED, have to integrate with counter and
latency metrics -- qqu
+ * Compute a new Cluster Model with scope limited to partitions with best
possible assignment missing minActiveReplicas
+ * because of delayed rebalance setting.
+ * @param dataProvider The controller's data cache
+ * @param resourceMap The full map of the resource by name
+ * @param activeInstances The active instances that will be used in the
calculation.
+ * @param resourceAssignment The resource assignment state to compute on.
This should be the current state assignment;
+ * if it's run right after another rebalance
calculation, the best possible assignment from
+ * previous result can be used.
+ * @return the ClusterModel
+ */
+ public static ClusterModel generateClusterModelForDelayedRebalanceOverwrites(
+ ResourceControllerDataProvider dataProvider,
+ Map<String, Resource> resourceMap,
+ Set<String> activeInstances,
+ Map<String, ResourceAssignment> resourceAssignment) {
+ return generateClusterModel(dataProvider, resourceMap, activeInstances,
Collections.emptyMap(),
+ Collections.emptyMap(), resourceAssignment,
RebalanceScopeType.DELAYED_REBALANCE_OVERWRITES);
}
/**
@@ -193,6 +218,11 @@ public class ClusterModelProvider {
toBeAssignedReplicas =
findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
currentAssignment, allocatedReplicas);
break;
+ case DELAYED_REBALANCE_OVERWRITES:
+ toBeAssignedReplicas =
+
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(dataProvider,
replicaMap.keySet(),
+ activeInstances, currentAssignment, allocatedReplicas);
+ break;
default:
throw new HelixException("Unknown rebalance scope type: " + scopeType);
}
@@ -474,7 +504,7 @@ public class ClusterModelProvider {
}
// <partition, <state, instances set>>
- private static Map<String, Map<String, Set<String>>> getStateInstanceMap(
+ public static Map<String, Map<String, Set<String>>> getStateInstanceMap(
ResourceAssignment assignment) {
if (assignment == null) {
return Collections.emptyMap();
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 93f61f64e..fc316c2a5 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,6 +19,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -29,7 +31,9 @@ import java.util.stream.Collectors;
import org.apache.helix.HelixConstants;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -37,6 +41,8 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -47,6 +53,7 @@ import static org.mockito.Mockito.when;
public class TestClusterModelProvider extends AbstractTestClusterModel {
Set<String> _instances;
+ Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
@BeforeClass
public void initialize() {
@@ -94,6 +101,304 @@ public class TestClusterModelProvider extends
AbstractTestClusterModel {
return testCache;
}
+ @Test
+ public void testFindToBeAssignedReplicasForMinActiveReplica() throws
IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ String instance1 = _testInstanceId;
+ String offlineInstance = _testInstanceId + "1";
+ String instance2 = _testInstanceId + "2";
+ Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+ liveInstanceMap.put(instance1, createMockLiveInstance(instance1));
+ liveInstanceMap.put(instance2, createMockLiveInstance(instance2));
+ Set<String> activeInstances = new HashSet<>();
+ activeInstances.add(instance1);
+ activeInstances.add(instance2);
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+
+ // test 0, empty input
+ Assert.assertEquals(
+
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache,
Collections.emptySet(),
+ activeInstances, Collections.emptyMap(), new HashMap<>()),
+ Collections.emptySet());
+
+ // test 1, one partition under minActiveReplica
+ Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
+ _resourceNames.get(0),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
+ _partitionNames.get(1), ImmutableMap.of("OFFLINE",
offlineInstance)), // Partition2-MASTER
+ _resourceNames.get(1),
+ ImmutableMap.of(
+ _partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
+ _partitionNames.get(3), ImmutableMap.of("SLAVE", instance1))
+ );
+ Map<String, Set<AssignableReplica>> replicaMap = new HashMap<>(); // to
populate
+ Map<String, ResourceAssignment> currentAssignment = new HashMap<>(); // to
populate
+ prepareData(input, replicaMap, currentAssignment, testCache, 1);
+
+ Map<String, Set<AssignableReplica>> allocatedReplicas = new HashMap<>();
+ Set<AssignableReplica> toBeAssignedReplicas =
+
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache,
replicaMap.keySet(), activeInstances,
+ currentAssignment, allocatedReplicas);
+
+ Assert.assertEquals(toBeAssignedReplicas.size(), 1);
+
Assert.assertTrue(toBeAssignedReplicas.stream().map(AssignableReplica::toString).collect(Collectors.toSet())
+ .contains("Resource1-Partition2-MASTER"));
+ AssignableReplica replica = toBeAssignedReplicas.iterator().next();
+ Assert.assertEquals(replica.getReplicaState(), "MASTER");
+ Assert.assertEquals(replica.getPartitionName(), "Partition2");
+
+ // test 2, no additional replica to be assigned
+ testCache = setupClusterDataCache();
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ input = ImmutableMap.of(
+ _resourceNames.get(0),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
+ _partitionNames.get(1), ImmutableMap.of("SLAVE", instance2)),
+ _resourceNames.get(1),
+ ImmutableMap.of(
+ _partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
+ _partitionNames.get(3), ImmutableMap.of("SLAVE", instance2))
+ );
+ replicaMap = new HashMap<>(); // to populate
+ currentAssignment = new HashMap<>(); // to populate
+ prepareData(input, replicaMap, currentAssignment, testCache, 1);
+ allocatedReplicas = new HashMap<>();
+ toBeAssignedReplicas =
+
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache,
replicaMap.keySet(), activeInstances,
+ currentAssignment, allocatedReplicas);
+ Assert.assertTrue(toBeAssignedReplicas.isEmpty());
+ Assert.assertEquals(allocatedReplicas.get(instance1).size(), 2);
+ Assert.assertEquals(allocatedReplicas.get(instance2).size(), 2);
+
+ // test 3, minActiveReplica==2, two partitions falling short
+ testCache = setupClusterDataCache();
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ input = ImmutableMap.of(
+ _resourceNames.get(0),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of("MASTER", instance1,
"SLAVE", instance2),
+ _partitionNames.get(1), ImmutableMap.of("MASTER", instance1,
"OFFLINE", offlineInstance)), // Partition2-SLAVE
+ _resourceNames.get(1),
+ ImmutableMap.of(
+ _partitionNames.get(2), ImmutableMap.of("MASTER", instance1,
"SLAVE", instance2),
+ _partitionNames.get(3), ImmutableMap.of("SLAVE", instance1,
"OFFLINE", offlineInstance)) // Partition4-MASTER
+ );
+ replicaMap = new HashMap<>(); // to populate
+ currentAssignment = new HashMap<>(); // to populate
+ prepareData(input, replicaMap, currentAssignment, testCache, 2);
+ allocatedReplicas = new HashMap<>();
+ toBeAssignedReplicas =
+
DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(testCache,
replicaMap.keySet(), activeInstances,
+ currentAssignment, allocatedReplicas);
+ Assert.assertEquals(toBeAssignedReplicas.size(), 2);
+
Assert.assertEquals(toBeAssignedReplicas.stream().map(AssignableReplica::toString).collect(Collectors.toSet()),
+ ImmutableSet.of("Resource1-Partition2-SLAVE",
"Resource2-Partition4-MASTER"));
+ Assert.assertEquals(allocatedReplicas.get(instance1).size(), 4);
+ Assert.assertEquals(allocatedReplicas.get(instance2).size(), 2);
+ }
+
+ @Test(dependsOnMethods = "testFindToBeAssignedReplicasForMinActiveReplica")
+ public void testClusterModelForDelayedRebalanceOverwrite() throws
IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ String instance1 = _testInstanceId;
+ String offlineInstance = _testInstanceId + "1";
+ String instance2 = _testInstanceId + "2";
+ Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+ liveInstanceMap.put(instance1, createMockLiveInstance(instance1));
+ liveInstanceMap.put(instance2, createMockLiveInstance(instance2));
+ Set<String> activeInstances = new HashSet<>();
+ activeInstances.add(instance1);
+ activeInstances.add(instance2);
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+
+ // test 1, one partition under minActiveReplica
+ Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
+ _resourceNames.get(0),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of("MASTER", instance1),
+ _partitionNames.get(1), ImmutableMap.of("OFFLINE",
offlineInstance), // Partition2-MASTER
+ _partitionNames.get(2), ImmutableMap.of("MASTER", instance2),
+ _partitionNames.get(3), ImmutableMap.of("MASTER", instance2)),
+ _resourceNames.get(1),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of("MASTER", instance2),
+ _partitionNames.get(1), ImmutableMap.of("MASTER", instance2),
+ _partitionNames.get(2), ImmutableMap.of("MASTER", instance1),
+ _partitionNames.get(3), ImmutableMap.of("OFFLINE",
offlineInstance)) // Partition4-MASTER
+ );
+ Map<String, Set<AssignableReplica>> replicaMap = new HashMap<>(); // to
populate
+ Map<String, ResourceAssignment> currentAssignment = new HashMap<>(); // to
populate
+ prepareData(input, replicaMap, currentAssignment, testCache, 1);
+
+ Map<String, Resource> resourceMap =
_resourceNames.stream().collect(Collectors.toMap(resource -> resource,
Resource::new));
+ ClusterModel clusterModel =
ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(testCache,
+ resourceMap, activeInstances, currentAssignment);
+ Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
+
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance1));
+
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance2));
+
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance1).getAssignedReplicas().size(),
2);
+
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance2).getAssignedReplicas().size(),
4);
+
+
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource1").size(),
1);
+
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource1").iterator().next().toString(),
+ "Resource1-Partition2-MASTER");
+
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource2").size(),
1);
+
Assert.assertEquals(clusterModel.getAssignableReplicaMap().get("Resource2").iterator().next().toString(),
+ "Resource2-Partition4-MASTER");
+
+ // test 2, minActiveReplica==2, three partitions falling short
+ testCache = setupClusterDataCache();
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+ input = ImmutableMap.of(
+ _resourceNames.get(0),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of("MASTER", instance1,
"SLAVE", instance2),
+ _partitionNames.get(1), ImmutableMap.of("MASTER", instance1,
"OFFLINE", offlineInstance), // Partition2-SLAVE
+ _partitionNames.get(2), ImmutableMap.of("OFFLINE",
offlineInstance, "SLAVE", instance2), // Partition3-MASTER
+ _partitionNames.get(3), ImmutableMap.of("MASTER", instance1,
"SLAVE", instance2)),
+ _resourceNames.get(1),
+ ImmutableMap.of(
+ _partitionNames.get(0), ImmutableMap.of("MASTER", instance1,
"SLAVE", instance2),
+ _partitionNames.get(1), ImmutableMap.of("MASTER", instance1,
"SLAVE", instance2),
+ _partitionNames.get(2), ImmutableMap.of("MASTER", instance1,
"SLAVE", instance2),
+ _partitionNames.get(3), ImmutableMap.of("OFFLINE",
offlineInstance, "ERROR", instance2)) // Partition4-MASTER
+ );
+ replicaMap = new HashMap<>(); // to populate
+ currentAssignment = new HashMap<>(); // to populate
+ prepareData(input, replicaMap, currentAssignment, testCache, 2);
+ clusterModel =
ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(testCache,
+ resourceMap, activeInstances, currentAssignment);
+ Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
+
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance1));
+
Assert.assertTrue(clusterModel.getAssignableNodes().containsKey(instance2));
+
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance1).getAssignedReplicas().size(),
6);
+
Assert.assertEquals(clusterModel.getAssignableNodes().get(instance2).getAssignedReplicas().size(),
7);
+
+ Set<String> replicaSet =
clusterModel.getAssignableReplicaMap().get(_resourceNames.get(0))
+ .stream()
+ .map(AssignableReplica::toString)
+ .collect(Collectors.toSet());
+ Assert.assertEquals(replicaSet.size(), 2);
+ Assert.assertTrue(replicaSet.contains("Resource1-Partition2-SLAVE"));
+ Assert.assertTrue(replicaSet.contains("Resource1-Partition3-MASTER"));
+ replicaSet =
clusterModel.getAssignableReplicaMap().get(_resourceNames.get(1))
+ .stream()
+ .map(AssignableReplica::toString)
+ .collect(Collectors.toSet());
+ Assert.assertEquals(replicaSet.size(), 1);
+ Assert.assertTrue(replicaSet.contains("Resource2-Partition4-MASTER"));
+ }
+
+ /**
+ * Prepare mock objects with given input. This methods prepare replicaMap
and populate testCache with currentState.
+ *
+ * @param input <resource, <partition, <state, instance> > >
+ * @param replicaMap The data map to prepare, a set of AssignableReplica by
resource name.
+ * @param currentAssignment The data map to prepare, resourceAssignment by
resource name
+ * @param testCache The mock object to prepare
+ */
+ private void prepareData(Map<String, Map<String, Map<String, String>>> input,
+ Map<String, Set<AssignableReplica>> replicaMap,
+ Map<String, ResourceAssignment> currentAssignment,
+ ResourceControllerDataProvider testCache,
+ int minActiveReplica) {
+
+ // Set up mock idealstate
+ Map<String, IdealState> isMap = new HashMap<>();
+ for (String resource : _resourceNames) {
+ ResourceConfig resourceConfig = new ResourceConfig.Builder(resource)
+ .setMinActiveReplica(minActiveReplica)
+ .setNumReplica(3)
+ .build();
+ _resourceConfigMap.put(resource, resourceConfig);
+ IdealState is = new IdealState(resource);
+ is.setNumPartitions(_partitionNames.size());
+ is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ is.setStateModelDefRef("MasterSlave");
+ is.setReplicas("3");
+ is.setMinActiveReplicas(minActiveReplica);
+ is.setRebalancerClassName(WagedRebalancer.class.getName());
+ _partitionNames.forEach(partition -> is.setPreferenceList(partition,
Collections.emptyList()));
+ isMap.put(resource, is);
+ }
+ when(testCache.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock ->
isMap.get(invocationOnMock.getArguments()[0]));
+ when(testCache.getResourceConfig(anyString())).thenAnswer(
+ (Answer<ResourceConfig>) invocationOnMock ->
_resourceConfigMap.get(invocationOnMock.getArguments()[0]));
+
+
+ // <instance, <resource, CurrentState>>
+ Map<String, Map<String, CurrentState>> currentStateByInstanceByResource =
new HashMap<>();
+ Map<String, Map<String, Map<String, String>>>
stateByInstanceByResourceByPartition = new HashMap<>();
+
+ for (String resource : input.keySet()) {
+ Set<AssignableReplica> replicas = new HashSet<>();
+ replicaMap.put(resource, replicas);
+ ResourceConfig resourceConfig = _resourceConfigMap.get(resource);
+ for (String partition : input.get(resource).keySet()) {
+ input.get(resource).get(partition).forEach(
+ (state, instance) -> {
+ stateByInstanceByResourceByPartition
+ .computeIfAbsent(instance, k -> new HashMap<>())
+ .computeIfAbsent(resource, k -> new HashMap<>())
+ .put(partition, state);
+ replicas.add(new MockAssignableReplica(resourceConfig,
partition, state));
+ });
+ }
+ }
+ for (String instance : stateByInstanceByResourceByPartition.keySet()) {
+ for (String resource :
stateByInstanceByResourceByPartition.get(instance).keySet()) {
+ Map<String, String> partitionState =
stateByInstanceByResourceByPartition.get(instance).get(resource);
+ CurrentState testCurrentStateResource =
mockCurrentStateResource(partitionState);
+ currentStateByInstanceByResource.computeIfAbsent(instance, k -> new
HashMap<>()).put(resource, testCurrentStateResource);
+ }
+ }
+
+ for (String instance : currentStateByInstanceByResource.keySet()) {
+ when(testCache.getCurrentState(instance,
_sessionId)).thenReturn(currentStateByInstanceByResource.get(instance));
+ when(testCache.getCurrentState(instance, _sessionId, false))
+ .thenReturn(currentStateByInstanceByResource.get(instance));
+ }
+
+ // Mock a baseline assignment based on the current states.
+ for (String resource : _resourceNames) {
+ // <partition, <instance, state>>
+ Map<String, Map<String, String>> assignmentMap = new HashMap<>();
+ for (String instance : _instances) {
+ CurrentState cs = testCache.getCurrentState(instance,
_sessionId).get(resource);
+ if (cs != null) {
+ for (Map.Entry<String, String> stateEntry :
cs.getPartitionStateMap().entrySet()) {
+ assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new
HashMap<>())
+ .put(instance, stateEntry.getValue());
+ }
+ ResourceAssignment assignment = new ResourceAssignment(resource);
+ assignmentMap.keySet().forEach(partition -> assignment
+ .addReplicaMap(new Partition(partition),
assignmentMap.get(partition)));
+ currentAssignment.put(resource, assignment);
+ }
+ }
+ }
+ }
+
+ private CurrentState mockCurrentStateResource(Map<String, String>
partitionState) {
+ CurrentState testCurrentStateResource = Mockito.mock(CurrentState.class);
+
when(testCurrentStateResource.getResourceName()).thenReturn(_resourceNames.get(0));
+
when(testCurrentStateResource.getPartitionStateMap()).thenReturn(partitionState);
+
when(testCurrentStateResource.getStateModelDefRef()).thenReturn("MasterSlave");
+ when(testCurrentStateResource.getSessionId()).thenReturn(_sessionId);
+ for (Map.Entry<String, String> entry : partitionState.entrySet()) {
+
when(testCurrentStateResource.getState(entry.getKey())).thenReturn(entry.getValue());
+ }
+ return testCurrentStateResource;
+ }
+
@Test
public void testGenerateClusterModel() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
@@ -373,4 +678,10 @@ public class TestClusterModelProvider extends
AbstractTestClusterModel {
// No need to rebalance the replicas that are not in the baseline yet.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
}
+
+ static class MockAssignableReplica extends AssignableReplica {
+ MockAssignableReplica(ResourceConfig resourceConfig, String partition,
String replicaState) {
+ super(new ClusterConfig("testCluster"), resourceConfig, partition,
replicaState, 1);
+ }
+ }
}