This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 424aacc5e Support Simple Greedy Rebalance Strategy (#2758)
424aacc5e is described below

commit 424aacc5e67dd4d7ede53333d45950777cdd3432
Author: Junkai Xue <[email protected]>
AuthorDate: Tue Feb 13 09:31:23 2024 -0800

    Support Simple Greedy Rebalance Strategy (#2758)
    
    * Support Simple Greedy Rebalance Strategy
    
    Support Simple Greedy Rebalance Strategy for RoundRobin assignment. It also 
supported with global
    limitation on how many partition assigned for each of the instance.
    
    * Add integration tests and fix issues
---
 .../helix/controller/common/CapacityNode.java      |  91 ++++++++++++++++++
 .../ResourceControllerDataProvider.java            |  30 ++++++
 .../strategy/GreedyRebalanceStrategy.java          | 103 +++++++++++++++++++++
 .../java/org/apache/helix/model/ClusterConfig.java |  24 +++++
 .../rebalancer/TestGreedyRebalanceStrategy.java    |  85 +++++++++++++++++
 ...balanceWithGlobalPerInstancePartitionLimit.java |  82 ++++++++++++++++
 6 files changed, 415 insertions(+)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java 
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
new file mode 100644
index 000000000..fa5068e13
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A Node is an entity that can serve capacity recording purpose. It has a 
capacity and knowledge
+ * of partitions assigned to it, so it can decide if it can receive additional 
partitions.
+ */
+public class CapacityNode {
+  private int _currentlyAssigned;
+  private int _capacity;
+  private final String _id;
+  private final Map<String, Set<String>> _partitionMap;
+
+  public CapacityNode(String id) {
+    _partitionMap = new HashMap<>();
+    _currentlyAssigned = 0;
+    this._id = id;
+  }
+
+  /**
+   * Check if this replica can be legally added to this node
+   *
+   * @param resource  The resource to assign
+   * @param partition The partition to assign
+   * @return true if the assignment can be made, false otherwise
+   */
+  public boolean canAdd(String resource, String partition) {
+    if (_currentlyAssigned >= _capacity || (_partitionMap.containsKey(resource)
+        && _partitionMap.get(resource).contains(partition))) {
+      return false;
+    }
+    _partitionMap.computeIfAbsent(resource, k -> new 
HashSet<>()).add(partition);
+    _currentlyAssigned++;
+    return true;
+  }
+
+  /**
+   * Set the capacity of this node
+   * @param capacity  The capacity to set
+   */
+  public void setCapacity(int capacity) {
+    _capacity = capacity;
+  }
+
+  /**
+   * Get the ID of this node
+   * @return The ID of this node
+   */
+  public String getId() {
+    return _id;
+  }
+
+  /**
+   * Get number of partitions currently assigned to this node
+   * @return The number of partitions currently assigned to this node
+   */
+  public int getCurrentlyAssigned() {
+    return _currentlyAssigned;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
+        .append("\ncapacity:").append(_capacity);
+    return sb.toString();
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 9ca6de4f7..bc0cbf530 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
@@ -36,7 +37,9 @@ import org.apache.helix.common.caches.CustomizedStateCache;
 import org.apache.helix.common.caches.CustomizedViewCache;
 import org.apache.helix.common.caches.PropertyCache;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.common.CapacityNode;
 import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
 import 
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
 import org.apache.helix.controller.stages.MissingTopStateRecord;
@@ -81,6 +84,8 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
   // Maintain a set of all ChangeTypes for change detection
   private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
   private Set<String> _aggregationEnabledTypes = new HashSet<>();
+  private Set<CapacityNode> _simpleCapacitySet;
+
 
   // CrushEd strategy needs to have a stable partition list input. So this 
cached list persist the
   // previous seen partition lists. If the members in a list are not modified, 
the old list will be
@@ -172,6 +177,18 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
     // TODO: impacting user's clusters.
     refreshStablePartitionList(getIdealStates());
 
+    if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
+      
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
+      // Remove all cached IdealState because it is a global computation 
cannot partially be
+      // performed for some resources. The computation is simple as well not 
taking too much resource
+      // to recompute the assignments.
+      Set<String> cachedGreedyIdealStates = 
_idealMappingCache.values().stream().filter(
+              record -> 
record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name())
+                  
.equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
+          .collect(Collectors.toSet());
+      _idealMappingCache.keySet().removeAll(cachedGreedyIdealStates);
+    }
+
     LogUtil.logInfo(logger, getClusterEventId(), String.format(
         "END: ResourceControllerDataProvider.refresh() for cluster %s, started 
at %d took %d for %s pipeline",
         getClusterName(), startTime, System.currentTimeMillis() - startTime, 
getPipelineName()));
@@ -521,4 +538,17 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
   public WagedInstanceCapacity getWagedInstanceCapacity() {
     return _wagedInstanceCapacity;
   }
+
+  private void buildSimpleCapacityMap(int 
globalMaxPartitionAllowedPerInstance) {
+    _simpleCapacitySet = new HashSet<>();
+    for (String instance : getEnabledLiveInstances()) {
+      CapacityNode capacityNode = new CapacityNode(instance);
+      capacityNode.setCapacity(globalMaxPartitionAllowedPerInstance);
+      _simpleCapacitySet.add(capacityNode);
+    }
+  }
+
+  public Set<CapacityNode> getSimpleCapacitySet() {
+    return _simpleCapacitySet;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
new file mode 100644
index 000000000..60580c40a
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
@@ -0,0 +1,103 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.common.CapacityNode;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GreedyRebalanceStrategy implements 
RebalanceStrategy<ResourceControllerDataProvider> {
+  private static Logger logger = 
LoggerFactory.getLogger(GreedyRebalanceStrategy.class);
+  private String _resourceName;
+  private List<String> _partitions;
+  private LinkedHashMap<String, Integer> _states;
+
+  public GreedyRebalanceStrategy() {
+  }
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _states = states;
+  }
+
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes, 
final List<String> liveNodes,
+      final Map<String, Map<String, String>> currentMapping, 
ResourceControllerDataProvider clusterData) {
+    int numReplicas = countStateReplicas();
+    ZNRecord znRecord = new ZNRecord(_resourceName);
+    if (liveNodes.size() == 0) {
+      return znRecord;
+    }
+
+    if (clusterData.getSimpleCapacitySet() == null) {
+      logger.warn("No capacity set for resource: " + _resourceName);
+      return znRecord;
+    }
+
+    // Sort the assignable nodes by id
+    List<CapacityNode> assignableNodes = new 
ArrayList<>(clusterData.getSimpleCapacitySet());
+    Collections.sort(assignableNodes, 
Comparator.comparing(CapacityNode::getId));
+
+    // Assign partitions to node by order.
+    for (int i = 0, index = 0; i < _partitions.size(); i++) {
+      int startIndex = index;
+      List<String> preferenceList = new ArrayList<>();
+      for (int j = 0; j < numReplicas; j++) {
+        while (index - startIndex < assignableNodes.size()) {
+          CapacityNode node = assignableNodes.get(index++ % 
assignableNodes.size());
+          if (node.canAdd(_resourceName, _partitions.get(i))) {
+            preferenceList.add(node.getId());
+            break;
+          }
+        }
+
+        if (index - startIndex >= assignableNodes.size()) {
+          // If the all nodes have been tried out, then no node can be 
assigned.
+          logger.warn("No enough assignable nodes for resource: " + 
_resourceName);
+        }
+      }
+      znRecord.setListField(_partitions.get(i), preferenceList);
+    }
+
+    return znRecord;
+  }
+
+  private int countStateReplicas() {
+    int total = 0;
+    for (Integer count : _states.values()) {
+      total += count;
+    }
+    return total;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 628962166..750bbe612 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -70,6 +70,10 @@ public class ClusterConfig extends HelixProperty {
 
     // The following concerns maintenance mode
     MAX_PARTITIONS_PER_INSTANCE,
+    // The maximum number of partitions that an instance can serve in this 
cluster.
+    // This only works for GreedyRebalanceStrategy.
+    // TODO: if we want to support this for other rebalancers, we need to 
implement that logic
+    GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE,
     // The following two include offline AND disabled instances
     MAX_OFFLINE_INSTANCES_ALLOWED,
     NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT, // For auto-exiting maintenance mode
@@ -511,6 +515,26 @@ public class ClusterConfig extends HelixProperty {
     return 
_record.getIntField(ClusterConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(), 
-1);
   }
 
+  /**
+   * Set the maximum number of partitions allowed to assign to an instance in 
this cluster.
+   *
+   * @param globalMaxPartitionAllowedPerInstance the maximum number of 
partitions allowed
+   */
+  public void setGlobalMaxPartitionAllowedPerInstance(int 
globalMaxPartitionAllowedPerInstance) {
+    
_record.setIntField(ClusterConfigProperty.GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE.name(),
+        globalMaxPartitionAllowedPerInstance);
+  }
+
+  /**
+   * Get the maximum number of partitions allowed to assign to an instance in 
this cluster.
+   *
+   * @return the maximum number of partitions allowed, or Integer.MAX_VALUE
+   */
+  public int getGlobalMaxPartitionAllowedPerInstance() {
+    return _record.getIntField(
+        
ClusterConfigProperty.GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE.name(), -1);
+  }
+
   /**
    * Set the max offline instances allowed for the cluster. If number of 
pff-line or disabled
    * instances
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
new file mode 100644
index 000000000..d90e16136
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
@@ -0,0 +1,85 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.controller.common.CapacityNode;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+
+public class TestGreedyRebalanceStrategy {
+  private static final String TEST_CLUSTER_NAME = "TestCluster";
+  private static final String TEST_RESOURCE_PREFIX = "TestResource_";
+
+  @Test
+  public void testAssignmentWithGlobalPartitionLimit() {
+
+    ResourceControllerDataProvider clusterDataCache =
+        Mockito.mock(ResourceControllerDataProvider.class);
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, 
Integer>(2);
+    states.put("OFFLINE", 0);
+    states.put("ONLINE", 1);
+
+    Set<CapacityNode> capacityNodeSet = new HashSet<>();
+    for (int i = 0; i < 5; i++) {
+      CapacityNode capacityNode = new CapacityNode("Node-" + i);
+      capacityNode.setCapacity(1);
+      capacityNodeSet.add(capacityNode);
+    }
+
+    List<String> liveNodes =
+        
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+    List<String> partitions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      partitions.add(TEST_RESOURCE_PREFIX + "0_" + i);
+    }
+    when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+    GreedyRebalanceStrategy greedyRebalanceStrategy = new 
GreedyRebalanceStrategy();
+    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 
1);
+    greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, 
clusterDataCache);
+
+    partitions = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      partitions.add(TEST_RESOURCE_PREFIX + "1_" + i);
+    }
+    greedyRebalanceStrategy = new GreedyRebalanceStrategy();
+    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 
1);
+    greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, 
clusterDataCache);
+
+    Assert.assertEquals(
+        capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 
1).count(), 0);
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
new file mode 100644
index 000000000..6b675a8a3
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
@@ -0,0 +1,82 @@
+package org.apache.helix.integration.rebalancer;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends 
TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 10;
+    _numReplicas = 2;
+    _numDbs = 1;
+    _numPartitions = 4;
+    super.beforeClass();
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /*
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLUSTER_NAME + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testGreedyRebalanceWithGlobalPerInstancePartitionLimit() throws 
InterruptedException {
+    // Update cluster config and greedy rebalance strategy
+    ClusterConfig clusterConfig = 
_manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setGlobalMaxPartitionAllowedPerInstance(1);
+    _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+    IdealState idealState = _gSetupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+    idealState.setRebalanceStrategy(
+        
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+    _gSetupTool.getClusterManagementTool()
+        .setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, 
idealState);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "NewDB", 
2, "OnlineOffline",
+        IdealState.RebalanceMode.FULL_AUTO.name(),
+        
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+    _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, "NewDB", 1);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Process instance -> number of assigned partitions
+    ExternalView TGTDBView = _gSetupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
+    ExternalView newDBView =
+        
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
"NewDB");
+    Map<String, Integer> instancePartitionCountMap = new HashMap<>();
+    TGTDBView.getPartitionSet().stream()
+        .forEach(partition -> 
TGTDBView.getStateMap(partition).keySet().forEach(instance -> {
+          instancePartitionCountMap.put(instance,
+              instancePartitionCountMap.getOrDefault(instance, 0) + 1);
+        }));
+    newDBView.getPartitionSet().stream()
+        .forEach(partition -> 
newDBView.getStateMap(partition).keySet().forEach(instance -> {
+          instancePartitionCountMap.put(instance,
+              instancePartitionCountMap.getOrDefault(instance, 0) + 1);
+        }));
+
+    Assert.assertEquals(
+        instancePartitionCountMap.values().stream().filter(count -> count != 
1).count(), 0);
+  }
+}

Reply via email to