This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch helix-stickiness-rebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-stickiness-rebalancer by
this push:
new 0f13d9267 Helix stickiness rebalancer (#2878)
0f13d9267 is described below
commit 0f13d92679607f3f78d83ee9d7e491c1f47d8e4c
Author: frankmu <[email protected]>
AuthorDate: Fri Aug 9 12:13:41 2024 -0700
Helix stickiness rebalancer (#2878)
Create sticky assignment rebalance strategy
---
.../helix/controller/common/CapacityNode.java | 14 ++
.../ResourceControllerDataProvider.java | 8 +-
.../rebalancer/ConditionBasedRebalancer.java | 18 +-
.../strategy/GreedyRebalanceStrategy.java | 103 ----------
.../strategy/StickyRebalanceStrategy.java | 202 +++++++++++++++++++
.../java/org/apache/helix/model/ClusterConfig.java | 2 +-
.../rebalancer/TestGreedyRebalanceStrategy.java | 85 --------
.../rebalancer/TestStickyRebalanceStrategy.java | 214 +++++++++++++++++++++
...alanceWithGlobalPerInstancePartitionLimit.java} | 25 ++-
9 files changed, 467 insertions(+), 204 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
index fa5068e13..208ee7913 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
@@ -52,11 +52,25 @@ public class CapacityNode {
&& _partitionMap.get(resource).contains(partition))) {
return false;
}
+
+ // Add the partition to the resource's set of partitions in this node
_partitionMap.computeIfAbsent(resource, k -> new
HashSet<>()).add(partition);
_currentlyAssigned++;
return true;
}
+ /**
+ * Checks if a specific resource + partition is assigned to this node.
+ *
+ * @param resource the name of the resource
+ * @param partition the partition
+ * @return {@code true} if the resource + partition is assigned to this
node, {@code false} otherwise
+ */
+ public boolean hasPartition(String resource, String partition) {
+ Set<String> partitions = _partitionMap.get(resource);
+ return partitions != null && partitions.contains(partition);
+ }
+
/**
* Set the capacity of this node
* @param capacity The capacity to set
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index cdfcb0f24..74b5aa8b2 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -41,7 +41,7 @@ import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.MissingTopStateRecord;
@@ -191,11 +191,11 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
// Remove all cached IdealState because it is a global computation
cannot partially be
// performed for some resources. The computation is simple as well not
taking too much resource
// to recompute the assignments.
- Set<String> cachedGreedyIdealStates =
_idealMappingCache.values().stream().filter(
+ Set<String> cachedStickyIdealStates =
_idealMappingCache.values().stream().filter(
record ->
record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name())
-
.equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
+
.equals(StickyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
.collect(Collectors.toSet());
- _idealMappingCache.keySet().removeAll(cachedGreedyIdealStates);
+ _idealMappingCache.keySet().removeAll(cachedStickyIdealStates);
}
LogUtil.logInfo(logger, getClusterEventId(), String.format(
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
index 158699a97..84762ea51 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
@@ -20,12 +20,12 @@ package org.apache.helix.controller.rebalancer;
*/
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.helix.HelixException;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -98,8 +98,8 @@ public class ConditionBasedRebalancer extends
AbstractRebalancer<ResourceControl
LinkedHashMap<String, Integer> stateCountMap =
stateModelDef.getStateCountMap(assignableLiveInstance.size(),
replicas);
- List<String> assignableLiveNodes = new
ArrayList<>(assignableLiveInstance.keySet());
- List<String> assignableNodes = new
ArrayList<>(clusterData.getAssignableInstances());
+ Set<String> assignableLiveNodes = new
HashSet<>(assignableLiveInstance.keySet());
+ Set<String> assignableNodes = new
HashSet<>(clusterData.getAssignableInstances());
assignableNodes.removeAll(clusterData.getDisabledInstances());
assignableLiveNodes.retainAll(assignableNodes);
@@ -135,20 +135,22 @@ public class ConditionBasedRebalancer extends
AbstractRebalancer<ResourceControl
LOG.warn("Resource " + resourceName + " has tag " +
currentIdealState.getInstanceGroupTag()
+ " but no live participants have this tag");
}
- assignableNodes = new ArrayList<>(taggedNodes);
- assignableLiveNodes = new ArrayList<>(taggedLiveNodes);
+ assignableNodes = new HashSet<>(taggedNodes);
+ assignableLiveNodes = new HashSet<>(taggedLiveNodes);
}
// sort node lists to ensure consistent preferred assignments
- Collections.sort(assignableNodes);
- Collections.sort(assignableLiveNodes);
+ List<String> assignableNodesList =
+ assignableNodes.stream().sorted().collect(Collectors.toList());
+ List<String> assignableLiveNodesList =
+ assignableLiveNodes.stream().sorted().collect(Collectors.toList());
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
_rebalanceStrategy =
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(),
partitions, resourceName,
stateCountMap, maxPartition);
ZNRecord newMapping =
- _rebalanceStrategy.computePartitionAssignment(assignableNodes,
assignableLiveNodes,
+ _rebalanceStrategy.computePartitionAssignment(assignableNodesList,
assignableLiveNodesList,
currentMapping, clusterData);
if (LOG.isDebugEnabled()) {
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
deleted file mode 100644
index 60580c40a..000000000
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.helix.controller.rebalancer.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.controller.common.CapacityNode;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GreedyRebalanceStrategy implements
RebalanceStrategy<ResourceControllerDataProvider> {
- private static Logger logger =
LoggerFactory.getLogger(GreedyRebalanceStrategy.class);
- private String _resourceName;
- private List<String> _partitions;
- private LinkedHashMap<String, Integer> _states;
-
- public GreedyRebalanceStrategy() {
- }
-
- @Override
- public void init(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode) {
- _resourceName = resourceName;
- _partitions = partitions;
- _states = states;
- }
-
- @Override
- public ZNRecord computePartitionAssignment(final List<String> allNodes,
final List<String> liveNodes,
- final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) {
- int numReplicas = countStateReplicas();
- ZNRecord znRecord = new ZNRecord(_resourceName);
- if (liveNodes.size() == 0) {
- return znRecord;
- }
-
- if (clusterData.getSimpleCapacitySet() == null) {
- logger.warn("No capacity set for resource: " + _resourceName);
- return znRecord;
- }
-
- // Sort the assignable nodes by id
- List<CapacityNode> assignableNodes = new
ArrayList<>(clusterData.getSimpleCapacitySet());
- Collections.sort(assignableNodes,
Comparator.comparing(CapacityNode::getId));
-
- // Assign partitions to node by order.
- for (int i = 0, index = 0; i < _partitions.size(); i++) {
- int startIndex = index;
- List<String> preferenceList = new ArrayList<>();
- for (int j = 0; j < numReplicas; j++) {
- while (index - startIndex < assignableNodes.size()) {
- CapacityNode node = assignableNodes.get(index++ %
assignableNodes.size());
- if (node.canAdd(_resourceName, _partitions.get(i))) {
- preferenceList.add(node.getId());
- break;
- }
- }
-
- if (index - startIndex >= assignableNodes.size()) {
- // If the all nodes have been tried out, then no node can be
assigned.
- logger.warn("No enough assignable nodes for resource: " +
_resourceName);
- }
- }
- znRecord.setListField(_partitions.get(i), preferenceList);
- }
-
- return znRecord;
- }
-
- private int countStateReplicas() {
- int total = 0;
- for (Integer count : _states.values()) {
- total += count;
- }
- return total;
- }
-}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
new file mode 100644
index 000000000..7def42d08
--- /dev/null
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
@@ -0,0 +1,202 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.controller.common.CapacityNode;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceControllerDataProvider> {
+ private static Logger logger =
LoggerFactory.getLogger(StickyRebalanceStrategy.class);
+ private String _resourceName;
+ private List<String> _partitions;
+ private LinkedHashMap<String, Integer> _states;
+ private int _statesReplicaCount;
+
+ public StickyRebalanceStrategy() {
+ }
+
+ @Override
+ public void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ _resourceName = resourceName;
+ _partitions = partitions;
+ _states = states;
+ if (_states != null) {
+ _statesReplicaCount =
_states.values().stream().mapToInt(Integer::intValue).sum();
+ }
+ }
+
+ @Override
+ public ZNRecord computePartitionAssignment(final List<String> allNodes,
+ final List<String> liveNodes, final Map<String, Map<String, String>>
currentMapping,
+ ResourceControllerDataProvider clusterData) {
+ ZNRecord znRecord = new ZNRecord(_resourceName);
+ if (liveNodes.isEmpty()) {
+ return znRecord;
+ }
+
+ if (clusterData.getSimpleCapacitySet() == null) {
+ logger.warn("No capacity set for resource: {}", _resourceName);
+ return znRecord;
+ }
+
+ // Sort the assignable nodes by id
+ List<CapacityNode> assignableNodes = new
ArrayList<>(clusterData.getSimpleCapacitySet());
+ assignableNodes.sort(Comparator.comparing(CapacityNode::getId));
+
+ // Filter out the nodes if not in the liveNodes parameter
+ // Note the liveNodes parameter here might be processed within the
rebalancer, e.g. filter based on tags
+ Set<String> liveNodesSet = new HashSet<>(liveNodes);
+ assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId()));
+
+ // Populate valid state map given current mapping
+ Map<String, Map<String, String>> stateMap =
+ populateValidStateMapFromCurrentMapping(currentMapping,
assignableNodes);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("currentMapping: {}", currentMapping);
+ logger.debug("stateMap: {}", stateMap);
+ }
+
+ // Assign partitions to node by order.
+ for (int i = 0, index = 0; i < _partitions.size(); i++) {
+ int startIndex = index;
+ for (Map.Entry<String, Integer> entry : _states.entrySet()) {
+ String state = entry.getKey();
+ int stateReplicaNumber = entry.getValue();
+ // For this partition, compute existing number replicas
+ long existsReplicas =
+ stateMap.computeIfAbsent(_partitions.get(i), m -> new
HashMap<>()).values().stream()
+ .filter(s -> s.equals(state)).count();
+ for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) {
+ while (index - startIndex < assignableNodes.size()) {
+ CapacityNode node = assignableNodes.get(index++ %
assignableNodes.size());
+ if (node.canAdd(_resourceName, _partitions.get(i))) {
+ stateMap.get(_partitions.get(i)).put(node.getId(), state);
+ break;
+ }
+ }
+
+ if (index - startIndex >= assignableNodes.size()) {
+ // If the all nodes have been tried out, then no node can be
assigned.
+ logger.warn("No enough assignable nodes for resource: {}",
_resourceName);
+ }
+ }
+ }
+ }
+ for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) {
+ znRecord.setListField(entry.getKey(), new
ArrayList<>(entry.getValue().keySet()));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("znRecord: {}", znRecord);
+ }
+
+ return znRecord;
+ }
+
+ /**
+ * Populates a valid state map from the current mapping, filtering out
invalid nodes.
+ *
+ * @param currentMapping the current mapping of partitions to node states
+ * @param assignableNodes the list of nodes that can be assigned
+ * @return a map of partitions to valid node states
+ */
+ private Map<String, Map<String, String>>
populateValidStateMapFromCurrentMapping(
+ final Map<String, Map<String, String>> currentMapping,
+ final List<CapacityNode> assignableNodes) {
+ Map<String, Map<String, String>> validStateMap = new HashMap<>();
+ // Convert the assignableNodes to map for quick lookup
+ Map<String, CapacityNode> assignableNodeMap =
+ assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId,
node -> node));
+ if (currentMapping != null) {
+ for (Map.Entry<String, Map<String, String>> entry :
currentMapping.entrySet()) {
+ String partition = entry.getKey();
+ Map<String, String> currentNodeStateMap = new
HashMap<>(entry.getValue());
+ // Skip if current node state is invalid with state model
+ if (!isValidStateMap(currentNodeStateMap)) {
+ continue;
+ }
+ // Filter out invalid node assignment
+ currentNodeStateMap.entrySet()
+ .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(),
assignableNodeMap));
+
+ validStateMap.put(partition, currentNodeStateMap);
+ }
+ }
+ return validStateMap;
+ }
+
+ /**
+ * Validates whether the provided state mapping is valid according to the
defined state model.
+ *
+ * @param currentNodeStateMap A map representing the actual state mapping
where the key is the node ID and the value is the state.
+ * @return true if the state map is valid, false otherwise
+ */
+ private boolean isValidStateMap(final Map<String, String>
currentNodeStateMap) {
+ // Check if the size of the current state map exceeds the total state
count in state model
+ if (currentNodeStateMap.size() > _statesReplicaCount) {
+ return false;
+ }
+
+ Map<String, Integer> tmpStates = new HashMap<>(_states);
+ for (String state : currentNodeStateMap.values()) {
+ // Return invalid if:
+ // The state is not defined in the state model OR
+ // The state count exceeds the defined count in state model
+ if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) {
+ return false;
+ }
+ tmpStates.put(state, tmpStates.get(state) - 1);
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks if a node assignment is valid for a given partition.
+ *
+ * @param partition the partition to be assigned
+ * @param nodeId the ID of the node to be checked
+ * @param assignableNodeMap the map of node IDs to CapacityNode objects
+ * @return true if the node is valid for the assignment, false otherwise
+ */
+ private boolean isValidNodeAssignment(final String partition, final String
nodeId,
+ final Map<String, CapacityNode> assignableNodeMap) {
+ CapacityNode node = assignableNodeMap.get(nodeId);
+ // Return valid when following conditions match:
+ // 1. Node is in assignableNodeMap
+ // 2. Node hold current partition or we can assign current partition to
the node
+ return node != null && (node.hasPartition(_resourceName, partition) ||
node.canAdd(
+ _resourceName, partition));
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index edb7a76c6..ab2c40b79 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -71,7 +71,7 @@ public class ClusterConfig extends HelixProperty {
// The following concerns maintenance mode
MAX_PARTITIONS_PER_INSTANCE,
// The maximum number of partitions that an instance can serve in this
cluster.
- // This only works for GreedyRebalanceStrategy.
+ // This only works for StickyRebalanceStrategy.
// TODO: if we want to support this for other rebalancers, we need to
implement that logic
GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE,
// The following two include offline AND disabled instances
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
deleted file mode 100644
index d90e16136..000000000
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.helix.controller.common.CapacityNode;
-import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.when;
-
-public class TestGreedyRebalanceStrategy {
- private static final String TEST_CLUSTER_NAME = "TestCluster";
- private static final String TEST_RESOURCE_PREFIX = "TestResource_";
-
- @Test
- public void testAssignmentWithGlobalPartitionLimit() {
-
- ResourceControllerDataProvider clusterDataCache =
- Mockito.mock(ResourceControllerDataProvider.class);
- LinkedHashMap<String, Integer> states = new LinkedHashMap<String,
Integer>(2);
- states.put("OFFLINE", 0);
- states.put("ONLINE", 1);
-
- Set<CapacityNode> capacityNodeSet = new HashSet<>();
- for (int i = 0; i < 5; i++) {
- CapacityNode capacityNode = new CapacityNode("Node-" + i);
- capacityNode.setCapacity(1);
- capacityNodeSet.add(capacityNode);
- }
-
- List<String> liveNodes =
-
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
-
- List<String> partitions = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- partitions.add(TEST_RESOURCE_PREFIX + "0_" + i);
- }
- when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
-
- GreedyRebalanceStrategy greedyRebalanceStrategy = new
GreedyRebalanceStrategy();
- greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states,
1);
- greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null,
clusterDataCache);
-
- partitions = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- partitions.add(TEST_RESOURCE_PREFIX + "1_" + i);
- }
- greedyRebalanceStrategy = new GreedyRebalanceStrategy();
- greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states,
1);
- greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null,
clusterDataCache);
-
- Assert.assertEquals(
- capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() !=
1).count(), 0);
- }
-}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
new file mode 100644
index 000000000..45211df4e
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
@@ -0,0 +1,214 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.controller.common.CapacityNode;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+
+public class TestStickyRebalanceStrategy {
+ private static final String TEST_CLUSTER_NAME = "TestCluster";
+ private static final String TEST_RESOURCE_PREFIX = "TestResource_";
+
+ @Test
+ public void testAssignmentWithGlobalPartitionLimit() {
+
+ ResourceControllerDataProvider clusterDataCache =
+ Mockito.mock(ResourceControllerDataProvider.class);
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String,
Integer>(2);
+ states.put("OFFLINE", 0);
+ states.put("ONLINE", 1);
+
+ Set<CapacityNode> capacityNodeSet = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ CapacityNode capacityNode = new CapacityNode("Node-" + i);
+ capacityNode.setCapacity(1);
+ capacityNodeSet.add(capacityNode);
+ }
+
+ List<String> liveNodes =
+
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+ List<String> partitions = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ partitions.add(TEST_RESOURCE_PREFIX + "0_" + i);
+ }
+ when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+ StickyRebalanceStrategy greedyRebalanceStrategy = new
StickyRebalanceStrategy();
+ greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states,
1);
+ greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null,
clusterDataCache);
+
+ partitions = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ partitions.add(TEST_RESOURCE_PREFIX + "1_" + i);
+ }
+ greedyRebalanceStrategy = new StickyRebalanceStrategy();
+ greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states,
1);
+ greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null,
clusterDataCache);
+
+ Assert.assertEquals(
+ capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() !=
1).count(), 0);
+ }
+
+ @Test
+ public void testStickyAssignment() {
+ final int nReplicas = 4;
+ final int nPartitions = 4;
+ final int nNode = 16;
+
+ ResourceControllerDataProvider clusterDataCache =
+ Mockito.mock(ResourceControllerDataProvider.class);
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String,
Integer>(2);
+ states.put("OFFLINE", 0);
+ states.put("ONLINE", nReplicas);
+
+ Set<CapacityNode> capacityNodeSet = new HashSet<>();
+ for (int i = 0; i < nNode; i++) {
+ CapacityNode capacityNode = new CapacityNode("Node-" + i);
+ capacityNode.setCapacity(1);
+ capacityNodeSet.add(capacityNode);
+ }
+
+ List<String> liveNodes =
+
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+ List<String> partitions = new ArrayList<>();
+ for (int i = 0; i < nPartitions; i++) {
+ partitions.add(TEST_RESOURCE_PREFIX + i);
+ }
+ when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+ // Populate previous assignment with currentMapping
+ Map<String, Map<String, String>> currentMapping = new HashMap<>();
+ currentMapping.put(TEST_RESOURCE_PREFIX + "0", new HashMap<>());
+ currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-0", "ONLINE");
+ currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-2", "ONLINE");
+ currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-4", "ONLINE");
+ currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-6", "ONLINE");
+ currentMapping.put(TEST_RESOURCE_PREFIX + "2", new HashMap<>());
+ currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-1", "ONLINE");
+ currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-5", "ONLINE");
+ currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-8", "ONLINE");
+
+ StickyRebalanceStrategy greedyRebalanceStrategy = new
StickyRebalanceStrategy();
+ greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states,
1);
+ ZNRecord shardAssignment =
+ greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes,
currentMapping,
+ clusterDataCache);
+
+ // Assert the existing assignment won't be changed
+ Assert.assertEquals(currentMapping.get(TEST_RESOURCE_PREFIX +
"0").keySet(),
+ new HashSet<>(shardAssignment.getListField(TEST_RESOURCE_PREFIX +
"0")));
+ Assert.assertTrue(shardAssignment.getListField(TEST_RESOURCE_PREFIX + "2")
+ .containsAll(currentMapping.get(TEST_RESOURCE_PREFIX + "2").keySet()));
+ }
+
+ @Test
+ public void testStickyAssignmentMultipleTimes() {
+ final int nReplicas = 4;
+ final int nPartitions = 4;
+ final int nNode = 12;
+
+ ResourceControllerDataProvider clusterDataCache =
+ Mockito.mock(ResourceControllerDataProvider.class);
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String,
Integer>(2);
+ states.put("OFFLINE", 0);
+ states.put("ONLINE", nReplicas);
+
+ Set<CapacityNode> capacityNodeSet = new HashSet<>();
+ for (int i = 0; i < nNode; i++) {
+ CapacityNode capacityNode = new CapacityNode("Node-" + i);
+ capacityNode.setCapacity(1);
+ capacityNodeSet.add(capacityNode);
+ }
+
+ List<String> liveNodes =
+
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+ List<String> partitions = new ArrayList<>();
+ for (int i = 0; i < nPartitions; i++) {
+ partitions.add(TEST_RESOURCE_PREFIX + i);
+ }
+ when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+ StickyRebalanceStrategy greedyRebalanceStrategy = new
StickyRebalanceStrategy();
+ greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states,
1);
+ // First round assignment computation:
+ // 1. Without previous assignment (currentMapping is null)
+ // 2. Without enough assignable nodes
+ ZNRecord firstRoundShardAssignment =
+ greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes,
null, clusterDataCache);
+
+ // Assert only 3 partitions are fulfilled with assignment
+
Assert.assertEquals(firstRoundShardAssignment.getListFields().entrySet().stream()
+ .filter(e -> e.getValue().size() == nReplicas).count(), 3);
+
+ // Assign 4 more nodes which is used in second round assignment computation
+ for (int i = nNode; i < nNode + 4; i++) {
+ CapacityNode capacityNode = new CapacityNode("Node-" + i);
+ capacityNode.setCapacity(1);
+ capacityNodeSet.add(capacityNode);
+ }
+
+ liveNodes =
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+ // Populate previous assignment (currentMapping) with first round
assignment computation result
+ Map<String, Map<String, String>> currentMapping = new HashMap<>();
+ firstRoundShardAssignment.getListFields().entrySet().stream()
+ .filter(e -> e.getValue().size() == nReplicas).forEach(e -> {
+ currentMapping.put(e.getKey(), new HashMap<>());
+ for (String nodeId : e.getValue()) {
+ currentMapping.get(e.getKey()).put(nodeId, "ONLINE");
+ }
+ });
+
+ // Second round assignment computation:
+ // 1. With previous assignment (currentMapping)
+ // 2. With enough assignable nodes
+ ZNRecord secondRoundShardAssignment =
+ greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes,
currentMapping,
+ clusterDataCache);
+
+ // Assert all partitions have been assigned with enough replica
+
Assert.assertEquals(secondRoundShardAssignment.getListFields().entrySet().stream()
+ .filter(e -> e.getValue().size() == nReplicas).count(), nPartitions);
+ // For previously existing assignment, assert there is no assignment change
+ currentMapping.forEach((partition, nodeMapping) -> {
+ Assert.assertEquals(nodeMapping.keySet(),
+ new HashSet<>(secondRoundShardAssignment.getListField(partition)));
+ });
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java
similarity index 77%
rename from
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
rename to
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java
index 6b675a8a3..e0d160b78 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java
@@ -1,5 +1,24 @@
package org.apache.helix.integration.rebalancer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -15,7 +34,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends
TaskTestBase {
+public class TestStickyRebalanceWithGlobalPerInstancePartitionLimit extends
TaskTestBase {
@BeforeClass
public void beforeClass() throws Exception {
@@ -48,14 +67,14 @@ public class
TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends Task
IdealState idealState = _gSetupTool.getClusterManagementTool()
.getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
idealState.setRebalanceStrategy(
-
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+
"org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy");
_gSetupTool.getClusterManagementTool()
.setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
idealState);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "NewDB",
2, "OnlineOffline",
IdealState.RebalanceMode.FULL_AUTO.name(),
-
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+
"org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy");
_gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, "NewDB", 1);
Assert.assertTrue(_clusterVerifier.verifyByPolling());