This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new 742fb3a9e Make StickyRebalanceStrategy topology aware (#2944)
742fb3a9e is described below
commit 742fb3a9e833085a711cbf9d85c3fdde265ae738
Author: frankmu <[email protected]>
AuthorDate: Thu Oct 31 08:50:23 2024 -0700
Make StickyRebalanceStrategy topology aware (#2944)
* Make StickyRebalanceStrategy topology aware
* move canAdd() to a separate function for extendability
* add todo
* address comment and add test
* constructor change for CapacityNode
---------
Co-authored-by: Tengfei Mu <[email protected]>
---
.../helix/controller/common/CapacityNode.java | 104 ++++++++++++++++++---
.../ResourceControllerDataProvider.java | 19 ++--
.../strategy/StickyRebalanceStrategy.java | 55 ++++++++---
.../java/org/apache/helix/common/ZkTestBase.java | 8 --
.../rebalancer/TestStickyRebalanceStrategy.java | 29 +++---
.../rebalancer/TestStickyRebalanceStrategy.java | 88 ++++++++++++-----
6 files changed, 227 insertions(+), 76 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
index 208ee7913..bd49040ff 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
@@ -21,23 +21,55 @@ package org.apache.helix.controller.common;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
+import org.apache.helix.model.InstanceConfig;
+
/**
* A Node is an entity that can serve capacity recording purpose. It has a
capacity and knowledge
* of partitions assigned to it, so it can decide if it can receive additional
partitions.
*/
-public class CapacityNode {
+public class CapacityNode implements Comparable<CapacityNode> {
private int _currentlyAssigned;
private int _capacity;
- private final String _id;
+ private final String _instanceName;
+ private final String _logicaId;
+ private final String _faultZone;
private final Map<String, Set<String>> _partitionMap;
- public CapacityNode(String id) {
- _partitionMap = new HashMap<>();
- _currentlyAssigned = 0;
- this._id = id;
+ /**
+ * Constructor used for non-topology-aware use case
+ * @param instanceName The instance name of this node
+ * @param capacity The capacity of this node
+ */
+ public CapacityNode(String instanceName, int capacity) {
+ this(instanceName, null, null, null);
+ this._capacity = capacity;
+ }
+
+ /**
+ * Constructor used for topology-aware use case
+ * @param instanceName The instance name of this node
+ * @param clusterConfig The cluster config for current helix cluster
+ * @param clusterTopologyConfig The cluster topology config for current
helix cluster
+ * @param instanceConfig The instance config for current instance
+ */
+ public CapacityNode(String instanceName, ClusterConfig clusterConfig,
+ ClusterTopologyConfig clusterTopologyConfig, InstanceConfig
instanceConfig) {
+ this._instanceName = instanceName;
+ this._logicaId = clusterTopologyConfig != null ?
instanceConfig.getLogicalId(
+ clusterTopologyConfig.getEndNodeType()) : instanceName;
+ this._faultZone =
+ clusterConfig != null ? computeFaultZone(clusterConfig,
instanceConfig) : null;
+ this._partitionMap = new HashMap<>();
+ this._capacity =
+ clusterConfig != null ?
clusterConfig.getGlobalMaxPartitionAllowedPerInstance() : 0;
+ this._currentlyAssigned = 0;
}
/**
@@ -80,11 +112,27 @@ public class CapacityNode {
}
/**
- * Get the ID of this node
- * @return The ID of this node
+ * Get the instance name of this node
+ * @return The instance name of this node
*/
- public String getId() {
- return _id;
+ public String getInstanceName() {
+ return _instanceName;
+ }
+
+ /**
+ * Get the logical id of this node
+ * @return The logical id of this node
+ */
+ public String getLogicalId() {
+ return _logicaId;
+ }
+
+ /**
+ * Get the fault zone of this node
+ * @return The fault zone of this node
+ */
+ public String getFaultZone() {
+ return _faultZone;
}
/**
@@ -98,8 +146,40 @@ public class CapacityNode {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
-
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
- .append("\ncapacity:").append(_capacity);
+ sb.append("##########\nname=").append(_instanceName).append("\nassigned:")
+
.append(_currentlyAssigned).append("\ncapacity:").append(_capacity).append("\nlogicalId:")
+ .append(_logicaId).append("\nfaultZone:").append(_faultZone);
return sb.toString();
}
+
+ @Override
+ public int compareTo(CapacityNode o) {
+ if (_logicaId != null) {
+ return _logicaId.compareTo(o.getLogicalId());
+ }
+ return _instanceName.compareTo(o.getInstanceName());
+ }
+
+ /**
+ * Computes the fault zone id based on the domain and fault zone type when
topology is enabled.
+ * For example, when
+ * the domain is "zone=2, instance=testInstance" and the fault zone type is
"zone", this function
+ * returns "2".
+ * If cannot find the fault zone type, this function leaves the fault zone
id as the instance name.
+ * TODO: change the return value to logical id when no fault zone type
found. Also do the same for
+ * waged rebalancer in
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+ */
+ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig
instanceConfig) {
+ LinkedHashMap<String, String> instanceTopologyMap =
+ Topology.computeInstanceTopologyMap(clusterConfig,
instanceConfig.getInstanceName(),
+ instanceConfig, true /*earlyQuitTillFaultZone*/);
+
+ StringBuilder faultZoneStringBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : instanceTopologyMap.entrySet()) {
+ faultZoneStringBuilder.append(entry.getValue());
+ faultZoneStringBuilder.append('/');
+ }
+ faultZoneStringBuilder.setLength(faultZoneStringBuilder.length() - 1);
+ return faultZoneStringBuilder.toString();
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 057cee155..c49535a65 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -46,6 +46,8 @@ import
org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.MissingTopStateRecord;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.CustomizedView;
@@ -190,7 +192,7 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
if (getClusterConfig() != null
&& getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1)
{
-
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
+ buildSimpleCapacityMap();
// Remove all cached IdealState because it is a global computation
cannot partially be
// performed for some resources. The computation is simple as well not
taking too much resource
// to recompute the assignments.
@@ -573,11 +575,16 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
return _wagedInstanceCapacity;
}
- private void buildSimpleCapacityMap(int
globalMaxPartitionAllowedPerInstance) {
+ private void buildSimpleCapacityMap() {
+ ClusterConfig clusterConfig = getClusterConfig();
+ ClusterTopologyConfig clusterTopologyConfig =
+ ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+ Map<String, InstanceConfig> instanceConfigMap =
getAssignableInstanceConfigMap();
_simpleCapacitySet = new HashSet<>();
- for (String instance : getEnabledLiveInstances()) {
- CapacityNode capacityNode = new CapacityNode(instance);
- capacityNode.setCapacity(globalMaxPartitionAllowedPerInstance);
+ for (String instanceName : getAssignableInstances()) {
+ CapacityNode capacityNode =
+ new CapacityNode(instanceName, clusterConfig, clusterTopologyConfig,
+ instanceConfigMap.getOrDefault(instanceName, new
InstanceConfig(instanceName)));
_simpleCapacitySet.add(capacityNode);
}
}
@@ -591,7 +598,7 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> simpleCapacityMap = new HashMap<>();
for (CapacityNode node : _simpleCapacitySet) {
- simpleCapacityMap.put(node.getId(), node);
+ simpleCapacityMap.put(node.getInstanceName(), node);
}
for (String resourceName : resourceNameSet) {
// Process current state mapping
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
index 3c3793cec..0471f128e 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.rebalancer.strategy;
*/
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -74,11 +73,15 @@ public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceContro
// Note the liveNodes parameter here might be processed within the
rebalancer, e.g. filter based on tags
Set<CapacityNode> assignableNodeSet = new
HashSet<>(clusterData.getSimpleCapacitySet());
Set<String> liveNodesSet = new HashSet<>(liveNodes);
- assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId()));
+ assignableNodeSet.removeIf(n ->
!liveNodesSet.contains(n.getInstanceName()));
+
+ // Convert the assignableNodes to map for quick lookup
+ Map<String, CapacityNode> assignableNodeMap = assignableNodeSet.stream()
+ .collect(Collectors.toMap(CapacityNode::getInstanceName, node ->
node));
// Populate valid state map given current mapping
Map<String, Set<String>> stateMap =
- populateValidAssignmentMapFromCurrentMapping(currentMapping,
assignableNodeSet);
+ populateValidAssignmentMapFromCurrentMapping(currentMapping,
assignableNodeMap);
if (logger.isDebugEnabled()) {
logger.debug("currentMapping: {}", currentMapping);
@@ -86,22 +89,33 @@ public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceContro
}
// Sort the assignable nodes by id
- List<CapacityNode> assignableNodeList =
-
assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId))
+ List<CapacityNode> assignableNodeList = assignableNodeSet.stream().sorted()
.collect(Collectors.toList());
// Assign partitions to node by order.
for (int i = 0, index = 0; i < _partitions.size(); i++) {
int startIndex = index;
+ Map<String, Integer> currentFaultZoneCountMap = new HashMap<>();
int remainingReplica = _statesReplicaCount;
if (stateMap.containsKey(_partitions.get(i))) {
- remainingReplica = remainingReplica -
stateMap.get(_partitions.get(i)).size();
+ Set<String> existingReplicas = stateMap.get(_partitions.get(i));
+ remainingReplica = remainingReplica - existingReplicas.size();
+ for (String instanceName : existingReplicas) {
+ String faultZone =
assignableNodeMap.get(instanceName).getFaultZone();
+ currentFaultZoneCountMap.put(faultZone,
+ currentFaultZoneCountMap.getOrDefault(faultZone, 0) + 1);
+ }
}
for (int j = 0; j < remainingReplica; j++) {
while (index - startIndex < assignableNodeList.size()) {
CapacityNode node = assignableNodeList.get(index++ %
assignableNodeList.size());
- if (node.canAdd(_resourceName, _partitions.get(i))) {
- stateMap.computeIfAbsent(_partitions.get(i), m -> new
HashSet<>()).add(node.getId());
+ if (this.canAdd(node, _partitions.get(i), currentFaultZoneCountMap))
{
+ stateMap.computeIfAbsent(_partitions.get(i), m -> new HashSet<>())
+ .add(node.getInstanceName());
+ if (node.getFaultZone() != null) {
+ currentFaultZoneCountMap.put(node.getFaultZone(),
+ currentFaultZoneCountMap.getOrDefault(node.getFaultZone(),
0) + 1);
+ }
break;
}
}
@@ -126,16 +140,13 @@ public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceContro
* Populates a valid state map from the current mapping, filtering out
invalid nodes.
*
* @param currentMapping the current mapping of partitions to node states
- * @param assignableNodes the list of nodes that can be assigned
+ * @param assignableNodeMap the map of instance name -> nodes that can be
assigned
* @return a map of partitions to valid nodes
*/
private Map<String, Set<String>>
populateValidAssignmentMapFromCurrentMapping(
final Map<String, Map<String, String>> currentMapping,
- final Set<CapacityNode> assignableNodes) {
+ final Map<String, CapacityNode> assignableNodeMap) {
Map<String, Set<String>> validAssignmentMap = new HashMap<>();
- // Convert the assignableNodes to map for quick lookup
- Map<String, CapacityNode> assignableNodeMap =
- assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId,
node -> node));
if (currentMapping != null) {
for (Map.Entry<String, Map<String, String>> entry :
currentMapping.entrySet()) {
String partition = entry.getKey();
@@ -167,4 +178,22 @@ public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceContro
return node != null && (node.hasPartition(_resourceName, partition) ||
node.canAdd(
_resourceName, partition));
}
+
+ /**
+ * Checks if it's valid to assign the partition to node
+ *
+ * @param node node to assign partition
+ * @param partition partition name
+ * @param currentFaultZoneCountMap the map of fault zones -> count
+ * @return true if it's valid to assign the partition to node, false
otherwise
+ */
+ protected boolean canAdd(CapacityNode node, String partition,
+ Map<String, Integer> currentFaultZoneCountMap) {
+ // Valid assignment when following conditions match:
+ // 1. Replica is not within the same fault zones of other replicas
+ // 2. Node has capacity to hold the replica
+ return !currentFaultZoneCountMap.containsKey(node.getFaultZone()) &&
node.canAdd(_resourceName,
+ partition);
+ }
}
+
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 28fa8d491..19c66dc19 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -366,14 +366,6 @@ public class ZkTestBase {
configAccessor.setClusterConfig(clusterName, clusterConfig);
}
- protected void
setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient,
- String clusterName, int maxPartitionAllowed) {
- ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
- ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
- clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
- configAccessor.setClusterConfig(clusterName, clusterConfig);
- }
-
protected IdealState createResourceWithDelayedRebalance(String clusterName,
String db,
String stateModel, int numPartition, int replica, int minActiveReplica,
long delay) {
return createResourceWithDelayedRebalance(clusterName, db, stateModel,
numPartition, replica,
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
index 45211df4e..acd30c2c7 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
@@ -53,13 +53,12 @@ public class TestStickyRebalanceStrategy {
Set<CapacityNode> capacityNodeSet = new HashSet<>();
for (int i = 0; i < 5; i++) {
- CapacityNode capacityNode = new CapacityNode("Node-" + i);
- capacityNode.setCapacity(1);
+ CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}
List<String> liveNodes =
-
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());
List<String> partitions = new ArrayList<>();
for (int i = 0; i < 3; i++) {
@@ -97,13 +96,12 @@ public class TestStickyRebalanceStrategy {
Set<CapacityNode> capacityNodeSet = new HashSet<>();
for (int i = 0; i < nNode; i++) {
- CapacityNode capacityNode = new CapacityNode("Node-" + i);
- capacityNode.setCapacity(1);
+ CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}
List<String> liveNodes =
-
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());
List<String> partitions = new ArrayList<>();
for (int i = 0; i < nPartitions; i++) {
@@ -150,13 +148,12 @@ public class TestStickyRebalanceStrategy {
Set<CapacityNode> capacityNodeSet = new HashSet<>();
for (int i = 0; i < nNode; i++) {
- CapacityNode capacityNode = new CapacityNode("Node-" + i);
- capacityNode.setCapacity(1);
+ CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}
List<String> liveNodes =
-
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());
List<String> partitions = new ArrayList<>();
for (int i = 0; i < nPartitions; i++) {
@@ -164,13 +161,13 @@ public class TestStickyRebalanceStrategy {
}
when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
- StickyRebalanceStrategy greedyRebalanceStrategy = new
StickyRebalanceStrategy();
- greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states,
1);
+ StickyRebalanceStrategy stickyRebalanceStrategy = new
StickyRebalanceStrategy();
+ stickyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states,
1);
// First round assignment computation:
// 1. Without previous assignment (currentMapping is null)
// 2. Without enough assignable nodes
ZNRecord firstRoundShardAssignment =
- greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes,
null, clusterDataCache);
+ stickyRebalanceStrategy.computePartitionAssignment(null, liveNodes,
null, clusterDataCache);
// Assert only 3 partitions are fulfilled with assignment
Assert.assertEquals(firstRoundShardAssignment.getListFields().entrySet().stream()
@@ -178,12 +175,12 @@ public class TestStickyRebalanceStrategy {
// Assign 4 more nodes which is used in second round assignment computation
for (int i = nNode; i < nNode + 4; i++) {
- CapacityNode capacityNode = new CapacityNode("Node-" + i);
- capacityNode.setCapacity(1);
+ CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}
- liveNodes =
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+ liveNodes =
+
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());
// Populate previous assignment (currentMapping) with first round
assignment computation result
Map<String, Map<String, String>> currentMapping = new HashMap<>();
@@ -199,7 +196,7 @@ public class TestStickyRebalanceStrategy {
// 1. With previous assignment (currentMapping)
// 2. With enough assignable nodes
ZNRecord secondRoundShardAssignment =
- greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes,
currentMapping,
+ stickyRebalanceStrategy.computePartitionAssignment(null, liveNodes,
currentMapping,
clusterDataCache);
// Assert all partitions have been assigned with enough replica
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
index 97a27017e..04860d692 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
@@ -34,8 +34,10 @@ import
org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
@@ -57,6 +59,7 @@ public class TestStickyRebalanceStrategy extends ZkTestBase {
protected ClusterControllerManager _controller;
protected List<MockParticipantManager> _participants = new ArrayList<>();
protected List<MockParticipantManager> _additionalParticipants = new
ArrayList<>();
+ protected Map<String, String> _instanceNameZoneMap = new HashMap<>();
protected int _minActiveReplica = 0;
protected ZkHelixClusterVerifier _clusterVerifier;
protected List<String> _testDBs = new ArrayList<>();
@@ -67,27 +70,17 @@ public class TestStickyRebalanceStrategy extends ZkTestBase
{
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+ _configAccessor = new ConfigAccessor(_gZkClient);
_gSetupTool.addCluster(CLUSTER_NAME, true);
for (int i = 0; i < NUM_NODE; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-
- // start dummy participants
- MockParticipantManager participant =
- new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
- participant.syncStart();
- _participants.add(participant);
+ _participants.addAll(addInstance("" + START_PORT + i, "zone-" + i %
REPLICAS, true));
}
for (int i = NUM_NODE; i < NUM_NODE + ADDITIONAL_NUM_NODE; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-
- MockParticipantManager participant =
- new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
- _additionalParticipants.add(participant);
+ _additionalParticipants.addAll(
+ addInstance("" + START_PORT + i, "zone-" + i % REPLICAS, false));
}
// start controller
@@ -147,9 +140,27 @@ public class TestStickyRebalanceStrategy extends
ZkTestBase {
_clusterVerifier.verifyByPolling();
}
+ @Test
+ public void testNoSameZoneAssignment() throws Exception {
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
1);
+ Map<String, ExternalView> externalViews = createTestDBs();
+ for (ExternalView ev : externalViews.values()) {
+ Map<String, Map<String, String>> assignments =
ev.getRecord().getMapFields();
+ Assert.assertNotNull(assignments);
+ Assert.assertEquals(assignments.size(), PARTITIONS);
+ for (Map<String, String> assignmentMap : assignments.values()) {
+ Assert.assertEquals(assignmentMap.keySet().size(), REPLICAS);
+ Set<String> zoneSet = new HashSet<>();
+ for (String instanceName : assignmentMap.keySet()) {
+ zoneSet.add(_instanceNameZoneMap.get(instanceName));
+ }
+ Assert.assertEquals(zoneSet.size(), REPLICAS);
+ }
+ }
+ }
@Test
public void testFirstTimeAssignmentWithNoInitialLiveNodes() throws Exception
{
- setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
1);
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
1);
// Shut down all the nodes
for (int i = 0; i < NUM_NODE; i++) {
_participants.get(i).syncStop();
@@ -175,7 +186,7 @@ public class TestStickyRebalanceStrategy extends ZkTestBase
{
@Test
public void testNoPartitionMovementWithNewInstanceAdd() throws Exception {
- setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
1);
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
1);
Map<String, ExternalView> externalViewsBefore = createTestDBs();
// Start more new instances
@@ -197,7 +208,7 @@ public class TestStickyRebalanceStrategy extends ZkTestBase
{
@Test
public void testNoPartitionMovementWithInstanceDown() throws Exception {
- setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
1);
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
1);
Map<String, ExternalView> externalViewsBefore = createTestDBs();
// Shut down 2 instances
@@ -220,7 +231,7 @@ public class TestStickyRebalanceStrategy extends ZkTestBase
{
@Test
public void testNoPartitionMovementWithInstanceRestart() throws Exception {
- setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
1);
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
1);
// Create resource
Map<String, ExternalView> externalViewsBefore = createTestDBs();
// Shut down half of the nodes
@@ -265,14 +276,14 @@ public class TestStickyRebalanceStrategy extends
ZkTestBase {
@Test
public void testFirstTimeAssignmentWithStackingPlacement() throws Exception {
- setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
2);
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
2);
Map<String, ExternalView> externalViewsBefore = createTestDBs();
validateAllPartitionAssigned(externalViewsBefore);
}
@Test
public void testNoPartitionMovementWithNewInstanceAddWithStackingPlacement()
throws Exception {
- setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
2);
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
2);
Map<String, ExternalView> externalViewsBefore = createTestDBs();
// Start more new instances
@@ -294,7 +305,7 @@ public class TestStickyRebalanceStrategy extends ZkTestBase
{
@Test
public void testNoPartitionMovementWithInstanceDownWithStackingPlacement()
throws Exception {
- setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
2);
+
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(CLUSTER_NAME,
2);
// Shut down half of the nodes given we allow stacking placement
for (int i = 0; i < NUM_NODE / 2; i++) {
_participants.get(i).syncStop();
@@ -395,4 +406,39 @@ public class TestStickyRebalanceStrategy extends
ZkTestBase {
}
}
}
+
+ private void
setTopologyAwareAndGlobalMaxPartitionAllowedPerInstanceInCluster(String
clusterName,
+ int maxPartitionAllowed) {
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setTopology("/zone/host/logicalId");
+ clusterConfig.setFaultZoneType("zone");
+ clusterConfig.setTopologyAwareEnabled(true);
+ clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
+ _configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ private List<MockParticipantManager> addInstance(String instanceNameSuffix,
String zone,
+ boolean enabled) {
+ List<MockParticipantManager> participants = new ArrayList<>();
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + instanceNameSuffix;
+ _instanceNameZoneMap.put(storageNodeName, zone);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+ String domain =
+ String.format("zone=%s,host=%s,logicalId=%s", zone, storageNodeName,
instanceNameSuffix);
+ InstanceConfig instanceConfig =
+ _configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
+ instanceConfig.setDomain(domain);
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ if (enabled) {
+ // start dummy participant
+ participant.syncStart();
+ }
+ participants.add(participant);
+
+ return participants;
+ }
}