This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 424aacc5e Support Simple Greedy Rebalance Strategy (#2758)
424aacc5e is described below
commit 424aacc5e67dd4d7ede53333d45950777cdd3432
Author: Junkai Xue <[email protected]>
AuthorDate: Tue Feb 13 09:31:23 2024 -0800
Support Simple Greedy Rebalance Strategy (#2758)
* Support Simple Greedy Rebalance Strategy
Support Simple Greedy Rebalance Strategy for RoundRobin assignment. It also
supported with global
limitation on how many partition assigned for each of the instance.
* Add integration tests and fix issues
---
.../helix/controller/common/CapacityNode.java | 91 ++++++++++++++++++
.../ResourceControllerDataProvider.java | 30 ++++++
.../strategy/GreedyRebalanceStrategy.java | 103 +++++++++++++++++++++
.../java/org/apache/helix/model/ClusterConfig.java | 24 +++++
.../rebalancer/TestGreedyRebalanceStrategy.java | 85 +++++++++++++++++
...balanceWithGlobalPerInstancePartitionLimit.java | 82 ++++++++++++++++
6 files changed, 415 insertions(+)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
new file mode 100644
index 000000000..fa5068e13
--- /dev/null
+++
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A Node is an entity that can serve capacity recording purpose. It has a
capacity and knowledge
+ * of partitions assigned to it, so it can decide if it can receive additional
partitions.
+ */
+public class CapacityNode {
+ private int _currentlyAssigned;
+ private int _capacity;
+ private final String _id;
+ private final Map<String, Set<String>> _partitionMap;
+
+ public CapacityNode(String id) {
+ _partitionMap = new HashMap<>();
+ _currentlyAssigned = 0;
+ this._id = id;
+ }
+
+ /**
+ * Check if this replica can be legally added to this node
+ *
+ * @param resource The resource to assign
+ * @param partition The partition to assign
+ * @return true if the assignment can be made, false otherwise
+ */
+ public boolean canAdd(String resource, String partition) {
+ if (_currentlyAssigned >= _capacity || (_partitionMap.containsKey(resource)
+ && _partitionMap.get(resource).contains(partition))) {
+ return false;
+ }
+ _partitionMap.computeIfAbsent(resource, k -> new
HashSet<>()).add(partition);
+ _currentlyAssigned++;
+ return true;
+ }
+
+ /**
+ * Set the capacity of this node
+ * @param capacity The capacity to set
+ */
+ public void setCapacity(int capacity) {
+ _capacity = capacity;
+ }
+
+ /**
+ * Get the ID of this node
+ * @return The ID of this node
+ */
+ public String getId() {
+ return _id;
+ }
+
+ /**
+ * Get number of partitions currently assigned to this node
+ * @return The number of partitions currently assigned to this node
+ */
+ public int getCurrentlyAssigned() {
+ return _currentlyAssigned;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
+ .append("\ncapacity:").append(_capacity);
+ return sb.toString();
+ }
+}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 9ca6de4f7..bc0cbf530 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
@@ -36,7 +37,9 @@ import org.apache.helix.common.caches.CustomizedStateCache;
import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.MissingTopStateRecord;
@@ -81,6 +84,8 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
// Maintain a set of all ChangeTypes for change detection
private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
private Set<String> _aggregationEnabledTypes = new HashSet<>();
+ private Set<CapacityNode> _simpleCapacitySet;
+
// CrushEd strategy needs to have a stable partition list input. So this
cached list persist the
// previous seen partition lists. If the members in a list are not modified,
the old list will be
@@ -172,6 +177,18 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
// TODO: impacting user's clusters.
refreshStablePartitionList(getIdealStates());
+ if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
+
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
+ // Remove all cached IdealState because it is a global computation
cannot partially be
+ // performed for some resources. The computation is simple as well not
taking too much resource
+ // to recompute the assignments.
+ Set<String> cachedGreedyIdealStates =
_idealMappingCache.values().stream().filter(
+ record ->
record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name())
+
.equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
+ .collect(Collectors.toSet());
+ _idealMappingCache.keySet().removeAll(cachedGreedyIdealStates);
+ }
+
LogUtil.logInfo(logger, getClusterEventId(), String.format(
"END: ResourceControllerDataProvider.refresh() for cluster %s, started
at %d took %d for %s pipeline",
getClusterName(), startTime, System.currentTimeMillis() - startTime,
getPipelineName()));
@@ -521,4 +538,17 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
public WagedInstanceCapacity getWagedInstanceCapacity() {
return _wagedInstanceCapacity;
}
+
+ private void buildSimpleCapacityMap(int
globalMaxPartitionAllowedPerInstance) {
+ _simpleCapacitySet = new HashSet<>();
+ for (String instance : getEnabledLiveInstances()) {
+ CapacityNode capacityNode = new CapacityNode(instance);
+ capacityNode.setCapacity(globalMaxPartitionAllowedPerInstance);
+ _simpleCapacitySet.add(capacityNode);
+ }
+ }
+
+ public Set<CapacityNode> getSimpleCapacitySet() {
+ return _simpleCapacitySet;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
new file mode 100644
index 000000000..60580c40a
--- /dev/null
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
@@ -0,0 +1,103 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.common.CapacityNode;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GreedyRebalanceStrategy implements
RebalanceStrategy<ResourceControllerDataProvider> {
+ private static Logger logger =
LoggerFactory.getLogger(GreedyRebalanceStrategy.class);
+ private String _resourceName;
+ private List<String> _partitions;
+ private LinkedHashMap<String, Integer> _states;
+
+ public GreedyRebalanceStrategy() {
+ }
+
+ @Override
+ public void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ _resourceName = resourceName;
+ _partitions = partitions;
+ _states = states;
+ }
+
+ @Override
+ public ZNRecord computePartitionAssignment(final List<String> allNodes,
final List<String> liveNodes,
+ final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) {
+ int numReplicas = countStateReplicas();
+ ZNRecord znRecord = new ZNRecord(_resourceName);
+ if (liveNodes.size() == 0) {
+ return znRecord;
+ }
+
+ if (clusterData.getSimpleCapacitySet() == null) {
+ logger.warn("No capacity set for resource: " + _resourceName);
+ return znRecord;
+ }
+
+ // Sort the assignable nodes by id
+ List<CapacityNode> assignableNodes = new
ArrayList<>(clusterData.getSimpleCapacitySet());
+ Collections.sort(assignableNodes,
Comparator.comparing(CapacityNode::getId));
+
+ // Assign partitions to node by order.
+ for (int i = 0, index = 0; i < _partitions.size(); i++) {
+ int startIndex = index;
+ List<String> preferenceList = new ArrayList<>();
+ for (int j = 0; j < numReplicas; j++) {
+ while (index - startIndex < assignableNodes.size()) {
+ CapacityNode node = assignableNodes.get(index++ %
assignableNodes.size());
+ if (node.canAdd(_resourceName, _partitions.get(i))) {
+ preferenceList.add(node.getId());
+ break;
+ }
+ }
+
+ if (index - startIndex >= assignableNodes.size()) {
+ // If the all nodes have been tried out, then no node can be
assigned.
+ logger.warn("No enough assignable nodes for resource: " +
_resourceName);
+ }
+ }
+ znRecord.setListField(_partitions.get(i), preferenceList);
+ }
+
+ return znRecord;
+ }
+
+ private int countStateReplicas() {
+ int total = 0;
+ for (Integer count : _states.values()) {
+ total += count;
+ }
+ return total;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 628962166..750bbe612 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -70,6 +70,10 @@ public class ClusterConfig extends HelixProperty {
// The following concerns maintenance mode
MAX_PARTITIONS_PER_INSTANCE,
+ // The maximum number of partitions that an instance can serve in this
cluster.
+ // This only works for GreedyRebalanceStrategy.
+ // TODO: if we want to support this for other rebalancers, we need to
implement that logic
+ GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE,
// The following two include offline AND disabled instances
MAX_OFFLINE_INSTANCES_ALLOWED,
NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT, // For auto-exiting maintenance mode
@@ -511,6 +515,26 @@ public class ClusterConfig extends HelixProperty {
return
_record.getIntField(ClusterConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
-1);
}
+ /**
+ * Set the maximum number of partitions allowed to assign to an instance in
this cluster.
+ *
+ * @param globalMaxPartitionAllowedPerInstance the maximum number of
partitions allowed
+ */
+ public void setGlobalMaxPartitionAllowedPerInstance(int
globalMaxPartitionAllowedPerInstance) {
+
_record.setIntField(ClusterConfigProperty.GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE.name(),
+ globalMaxPartitionAllowedPerInstance);
+ }
+
+ /**
+ * Get the maximum number of partitions allowed to assign to an instance in
this cluster.
+ *
+ * @return the maximum number of partitions allowed, or Integer.MAX_VALUE
+ */
+ public int getGlobalMaxPartitionAllowedPerInstance() {
+ return _record.getIntField(
+
ClusterConfigProperty.GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE.name(), -1);
+ }
+
/**
* Set the max offline instances allowed for the cluster. If number of
pff-line or disabled
* instances
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
new file mode 100644
index 000000000..d90e16136
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
@@ -0,0 +1,85 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.controller.common.CapacityNode;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+
+public class TestGreedyRebalanceStrategy {
+ private static final String TEST_CLUSTER_NAME = "TestCluster";
+ private static final String TEST_RESOURCE_PREFIX = "TestResource_";
+
+ @Test
+ public void testAssignmentWithGlobalPartitionLimit() {
+
+ ResourceControllerDataProvider clusterDataCache =
+ Mockito.mock(ResourceControllerDataProvider.class);
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String,
Integer>(2);
+ states.put("OFFLINE", 0);
+ states.put("ONLINE", 1);
+
+ Set<CapacityNode> capacityNodeSet = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ CapacityNode capacityNode = new CapacityNode("Node-" + i);
+ capacityNode.setCapacity(1);
+ capacityNodeSet.add(capacityNode);
+ }
+
+ List<String> liveNodes =
+
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+ List<String> partitions = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ partitions.add(TEST_RESOURCE_PREFIX + "0_" + i);
+ }
+ when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+ GreedyRebalanceStrategy greedyRebalanceStrategy = new
GreedyRebalanceStrategy();
+ greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states,
1);
+ greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null,
clusterDataCache);
+
+ partitions = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ partitions.add(TEST_RESOURCE_PREFIX + "1_" + i);
+ }
+ greedyRebalanceStrategy = new GreedyRebalanceStrategy();
+ greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states,
1);
+ greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null,
clusterDataCache);
+
+ Assert.assertEquals(
+ capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() !=
1).count(), 0);
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
new file mode 100644
index 000000000..6b675a8a3
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
@@ -0,0 +1,82 @@
+package org.apache.helix.integration.rebalancer;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends
TaskTestBase {
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numNodes = 10;
+ _numReplicas = 2;
+ _numDbs = 1;
+ _numPartitions = 4;
+ super.beforeClass();
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ /*
+ * shutdown order: 1) disconnect the controller 2) disconnect participants
+ */
+ _controller.syncStop();
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
+ }
+ deleteCluster(CLUSTER_NAME);
+ System.out.println("END " + CLUSTER_NAME + " at " + new
Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testGreedyRebalanceWithGlobalPerInstancePartitionLimit() throws
InterruptedException {
+ // Update cluster config and greedy rebalance strategy
+ ClusterConfig clusterConfig =
_manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setGlobalMaxPartitionAllowedPerInstance(1);
+ _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+ IdealState idealState = _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+ idealState.setRebalanceStrategy(
+
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+ _gSetupTool.getClusterManagementTool()
+ .setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
idealState);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "NewDB",
2, "OnlineOffline",
+ IdealState.RebalanceMode.FULL_AUTO.name(),
+
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+ _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, "NewDB", 1);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Process instance -> number of assigned partitions
+ ExternalView TGTDBView = _gSetupTool.getClusterManagementTool()
+ .getResourceExternalView(CLUSTER_NAME,
WorkflowGenerator.DEFAULT_TGT_DB);
+ ExternalView newDBView =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
"NewDB");
+ Map<String, Integer> instancePartitionCountMap = new HashMap<>();
+ TGTDBView.getPartitionSet().stream()
+ .forEach(partition ->
TGTDBView.getStateMap(partition).keySet().forEach(instance -> {
+ instancePartitionCountMap.put(instance,
+ instancePartitionCountMap.getOrDefault(instance, 0) + 1);
+ }));
+ newDBView.getPartitionSet().stream()
+ .forEach(partition ->
newDBView.getStateMap(partition).keySet().forEach(instance -> {
+ instancePartitionCountMap.put(instance,
+ instancePartitionCountMap.getOrDefault(instance, 0) + 1);
+ }));
+
+ Assert.assertEquals(
+ instancePartitionCountMap.values().stream().filter(count -> count !=
1).count(), 0);
+ }
+}