This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 1a210ef895230579023916ed48848c26930ccad1
Author: frankmu <muteng...@gmail.com>
AuthorDate: Thu Aug 29 16:12:41 2024 -0700

    Move existing assignments usage calculation to pre-process stage (#2888)
    
    Move existing assignments usage calculation to pre-process stage
---
 .../ResourceControllerDataProvider.java            |  31 ++
 .../rebalancer/ConditionBasedRebalancer.java       |  36 +--
 .../strategy/StickyRebalanceStrategy.java          |  96 ++----
 .../stages/CurrentStateComputationStage.java       |   8 +
 .../java/org/apache/helix/common/ZkTestBase.java   |  18 +-
 .../rebalancer/TestStickyRebalanceStrategy.java    | 353 +++++++++++++++++++++
 6 files changed, 458 insertions(+), 84 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 74b5aa8b2..37811d9a2 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -44,6 +44,7 @@ import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
 import 
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.MissingTopStateRecord;
 import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.CustomizedStateConfig;
@@ -51,6 +52,7 @@ import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
@@ -583,6 +585,35 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
     return _simpleCapacitySet;
   }
 
+  public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet,
+      final CurrentStateOutput currentStateOutput) {
+    // Convert the assignableNodes to map for quick lookup
+    Map<String, CapacityNode> simpleCapacityMap = new HashMap<>();
+    for (CapacityNode node : _simpleCapacitySet) {
+      simpleCapacityMap.put(node.getId(), node);
+    }
+    for (String resourceName : resourceNameSet) {
+      // Process current state mapping
+      populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
+          currentStateOutput.getCurrentStateMap(resourceName));
+      // Process pending state mapping
+      populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
+          currentStateOutput.getPendingMessageMap(resourceName));
+    }
+  }
+
+  private <T> void populateCapacityNodeUsageFromStateMap(String resourceName,
+      Map<String, CapacityNode> simpleCapacityMap, Map<Partition, Map<String, 
T>> stateMap) {
+    for (Map.Entry<Partition, Map<String, T>> entry : stateMap.entrySet()) {
+      for (String instanceName : entry.getValue().keySet()) {
+        CapacityNode node = simpleCapacityMap.get(instanceName);
+        if (node != null) {
+          node.canAdd(resourceName, entry.getKey().getPartitionName());
+        }
+      }
+    }
+  }
+
   private void refreshDisabledInstancesForAllPartitionsSet() {
     _disabledInstancesForAllPartitionsSet.clear();
     Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values();
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
index 84762ea51..6e68033c4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
@@ -72,16 +72,12 @@ public class ConditionBasedRebalancer extends 
AbstractRebalancer<ResourceControl
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,
       CurrentStateOutput currentStateOutput, ResourceControllerDataProvider 
clusterData) {
-    if (!this._rebalanceConditions.stream()
+    ZNRecord cachedIdealState = 
clusterData.getCachedOndemandIdealState(resourceName);
+    // If previous placement list exists in cache && all condition met -> 
return cached value
+    if (cachedIdealState != null && cachedIdealState.getListFields() != null
+        && !cachedIdealState.getListFields().isEmpty() && 
!this._rebalanceConditions.stream()
         .allMatch(condition -> condition.shouldPerformRebalance(clusterData))) 
{
-      ZNRecord cachedIdealState = 
clusterData.getCachedOndemandIdealState(resourceName);
-      if (cachedIdealState != null) {
-        return new IdealState(cachedIdealState);
-      }
-      // In theory, the cache should be populated already if no rebalance is 
needed
-      LOG.warn(
-          "Cannot fetch the cached Ideal State for resource: {}, will 
recompute the Ideal State",
-          resourceName);
+      return new IdealState(cachedIdealState);
     }
 
     LOG.info("Computing IdealState for " + resourceName);
@@ -189,18 +185,16 @@ public class ConditionBasedRebalancer extends 
AbstractRebalancer<ResourceControl
   public ResourceAssignment 
computeBestPossiblePartitionState(ResourceControllerDataProvider cache,
       IdealState idealState, Resource resource, CurrentStateOutput 
currentStateOutput) {
     ZNRecord cachedIdealState = 
cache.getCachedOndemandIdealState(resource.getResourceName());
-    if (!this._rebalanceConditions.stream()
+    // If previous assignment map exists in cache && all condition met -> 
return cached value
+    if (cachedIdealState.getMapFields() != null && 
!cachedIdealState.getMapFields().isEmpty()
+        && !this._rebalanceConditions.stream()
         .allMatch(condition -> condition.shouldPerformRebalance(cache))) {
-      if (cachedIdealState != null && cachedIdealState.getMapFields() != null) 
{
-        ResourceAssignment partitionMapping = new 
ResourceAssignment(resource.getResourceName());
-        for (Partition partition : resource.getPartitions()) {
-          partitionMapping.addReplicaMap(partition, 
cachedIdealState.getMapFields().get(partition));
-        }
-        return new ResourceAssignment(cachedIdealState);
+      ResourceAssignment partitionMapping = new 
ResourceAssignment(resource.getResourceName());
+      for (Partition partition : resource.getPartitions()) {
+        partitionMapping.addReplicaMap(partition,
+            cachedIdealState.getMapFields().get(partition.getPartitionName()));
       }
-      // In theory, the cache should be populated already if no rebalance is 
needed
-      LOG.warn("Cannot fetch the cached assignment for resource: {}, will 
recompute the assignment",
-          resource.getResourceName());
+      return partitionMapping;
     }
 
     LOG.info("Computing BestPossibleMapping for " + 
resource.getResourceName());
@@ -212,6 +206,10 @@ public class ConditionBasedRebalancer extends 
AbstractRebalancer<ResourceControl
     cachedIdealState.setMapFields(assignment.getRecord().getMapFields());
     cache.setCachedOndemandIdealState(resource.getResourceName(), 
cachedIdealState);
 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processed resource: {}", resource.getResourceName());
+      LOG.debug("Final Mapping of resource : {}", assignment);
+    }
     return assignment;
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
index 7def42d08..3c3793cec 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StickyRebalanceStrategy implements 
RebalanceStrategy<ResourceControllerDataProvider> {
-  private static Logger logger = 
LoggerFactory.getLogger(StickyRebalanceStrategy.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(StickyRebalanceStrategy.class);
   private String _resourceName;
   private List<String> _partitions;
   private LinkedHashMap<String, Integer> _states;
@@ -70,52 +70,50 @@ public class StickyRebalanceStrategy implements 
RebalanceStrategy<ResourceContro
       return znRecord;
     }
 
-    // Sort the assignable nodes by id
-    List<CapacityNode> assignableNodes = new 
ArrayList<>(clusterData.getSimpleCapacitySet());
-    assignableNodes.sort(Comparator.comparing(CapacityNode::getId));
-
     // Filter out the nodes if not in the liveNodes parameter
     // Note the liveNodes parameter here might be processed within the 
rebalancer, e.g. filter based on tags
+    Set<CapacityNode> assignableNodeSet = new 
HashSet<>(clusterData.getSimpleCapacitySet());
     Set<String> liveNodesSet = new HashSet<>(liveNodes);
-    assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId()));
+    assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId()));
 
     //  Populate valid state map given current mapping
-    Map<String, Map<String, String>> stateMap =
-        populateValidStateMapFromCurrentMapping(currentMapping, 
assignableNodes);
+    Map<String, Set<String>> stateMap =
+        populateValidAssignmentMapFromCurrentMapping(currentMapping, 
assignableNodeSet);
 
     if (logger.isDebugEnabled()) {
       logger.debug("currentMapping: {}", currentMapping);
       logger.debug("stateMap: {}", stateMap);
     }
 
+    // Sort the assignable nodes by id
+    List<CapacityNode> assignableNodeList =
+        
assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId))
+            .collect(Collectors.toList());
+
     // Assign partitions to node by order.
     for (int i = 0, index = 0; i < _partitions.size(); i++) {
       int startIndex = index;
-      for (Map.Entry<String, Integer> entry : _states.entrySet()) {
-        String state = entry.getKey();
-        int stateReplicaNumber = entry.getValue();
-        // For this partition, compute existing number replicas
-        long existsReplicas =
-            stateMap.computeIfAbsent(_partitions.get(i), m -> new 
HashMap<>()).values().stream()
-                .filter(s -> s.equals(state)).count();
-        for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) {
-          while (index - startIndex < assignableNodes.size()) {
-            CapacityNode node = assignableNodes.get(index++ % 
assignableNodes.size());
-            if (node.canAdd(_resourceName, _partitions.get(i))) {
-              stateMap.get(_partitions.get(i)).put(node.getId(), state);
-              break;
-            }
+      int remainingReplica = _statesReplicaCount;
+      if (stateMap.containsKey(_partitions.get(i))) {
+        remainingReplica = remainingReplica - 
stateMap.get(_partitions.get(i)).size();
+      }
+      for (int j = 0; j < remainingReplica; j++) {
+        while (index - startIndex < assignableNodeList.size()) {
+          CapacityNode node = assignableNodeList.get(index++ % 
assignableNodeList.size());
+          if (node.canAdd(_resourceName, _partitions.get(i))) {
+            stateMap.computeIfAbsent(_partitions.get(i), m -> new 
HashSet<>()).add(node.getId());
+            break;
           }
+        }
 
-          if (index - startIndex >= assignableNodes.size()) {
-            // If the all nodes have been tried out, then no node can be 
assigned.
-            logger.warn("No enough assignable nodes for resource: {}", 
_resourceName);
-          }
+        if (index - startIndex >= assignableNodeList.size()) {
+          // If the all nodes have been tried out, then no node can be 
assigned.
+          logger.warn("No enough assignable nodes for resource: {}", 
_resourceName);
         }
       }
     }
-    for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) {
-      znRecord.setListField(entry.getKey(), new 
ArrayList<>(entry.getValue().keySet()));
+    for (Map.Entry<String, Set<String>> entry : stateMap.entrySet()) {
+      znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue()));
     }
     if (logger.isDebugEnabled()) {
       logger.debug("znRecord: {}", znRecord);
@@ -129,12 +127,12 @@ public class StickyRebalanceStrategy implements 
RebalanceStrategy<ResourceContro
    *
    * @param currentMapping   the current mapping of partitions to node states
    * @param assignableNodes  the list of nodes that can be assigned
-   * @return a map of partitions to valid node states
+   * @return a map of partitions to valid nodes
    */
-  private Map<String, Map<String, String>> 
populateValidStateMapFromCurrentMapping(
+  private Map<String, Set<String>> 
populateValidAssignmentMapFromCurrentMapping(
       final Map<String, Map<String, String>> currentMapping,
-      final List<CapacityNode> assignableNodes) {
-    Map<String, Map<String, String>> validStateMap = new HashMap<>();
+      final Set<CapacityNode> assignableNodes) {
+    Map<String, Set<String>> validAssignmentMap = new HashMap<>();
     // Convert the assignableNodes to map for quick lookup
     Map<String, CapacityNode> assignableNodeMap =
         assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, 
node -> node));
@@ -142,44 +140,14 @@ public class StickyRebalanceStrategy implements 
RebalanceStrategy<ResourceContro
       for (Map.Entry<String, Map<String, String>> entry : 
currentMapping.entrySet()) {
         String partition = entry.getKey();
         Map<String, String> currentNodeStateMap = new 
HashMap<>(entry.getValue());
-        // Skip if current node state is invalid with state model
-        if (!isValidStateMap(currentNodeStateMap)) {
-          continue;
-        }
         // Filter out invalid node assignment
         currentNodeStateMap.entrySet()
             .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), 
assignableNodeMap));
 
-        validStateMap.put(partition, currentNodeStateMap);
-      }
-    }
-    return validStateMap;
-  }
-
-  /**
-   * Validates whether the provided state mapping is valid according to the 
defined state model.
-   *
-   * @param currentNodeStateMap A map representing the actual state mapping 
where the key is the node ID and the value is the state.
-   * @return true if the state map is valid, false otherwise
-   */
-  private boolean isValidStateMap(final Map<String, String> 
currentNodeStateMap) {
-    // Check if the size of the current state map exceeds the total state 
count in state model
-    if (currentNodeStateMap.size() > _statesReplicaCount) {
-      return false;
-    }
-
-    Map<String, Integer> tmpStates = new HashMap<>(_states);
-    for (String state : currentNodeStateMap.values()) {
-      // Return invalid if:
-      // The state is not defined in the state model OR
-      // The state count exceeds the defined count in state model
-      if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) {
-        return false;
+        validAssignmentMap.put(partition, new 
HashSet<>(currentNodeStateMap.keySet()));
       }
-      tmpStates.put(state, tmpStates.get(state) - 1);
     }
-
-    return true;
+    return validAssignmentMap;
   }
 
   /**
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index da972d682..64d113f0a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -134,6 +134,14 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
 
       handleResourceCapacityCalculation(event, 
(ResourceControllerDataProvider) cache, currentStateOutput);
     }
+
+    // Populate the capacity for simple CapacityNode
+    if (cache.getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != 
-1
+        && cache instanceof ResourceControllerDataProvider) {
+      final ResourceControllerDataProvider dataProvider = 
(ResourceControllerDataProvider) cache;
+      dataProvider.populateSimpleCapacitySetUsage(resourceToRebalance.keySet(),
+          currentStateExcludingUnknown);
+    }
   }
 
   // update all pending messages to CurrentStateOutput.
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index a26560518..28fa8d491 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -19,7 +19,6 @@ package org.apache.helix.common;
  * under the License.
  */
 
-import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
@@ -33,6 +32,7 @@ import java.util.logging.Level;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
 
+import com.google.common.base.Preconditions;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
@@ -50,6 +50,7 @@ import 
org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.ConditionBasedRebalancer;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -365,6 +366,14 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
+  protected void 
setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient,
+      String clusterName, int maxPartitionAllowed) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
   protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
       String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay) {
     return createResourceWithDelayedRebalance(clusterName, db, stateModel, 
numPartition, replica,
@@ -384,6 +393,13 @@ public class ZkTestBase {
         -1, WagedRebalancer.class.getName(), null);
   }
 
+  protected IdealState createResourceWithConditionBasedRebalance(String 
clusterName, String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica,
+      String rebalanceStrategy) {
+    return createResource(clusterName, db, stateModel, numPartition, replica, 
minActiveReplica, -1,
+        ConditionBasedRebalancer.class.getName(), rebalanceStrategy);
+  }
+
   private IdealState createResource(String clusterName, String db, String 
stateModel,
       int numPartition, int replica, int minActiveReplica, long delay, String 
rebalancerClassName,
       String rebalanceStrategy) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
new file mode 100644
index 000000000..4e4717f39
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
@@ -0,0 +1,353 @@
+package org.apache.helix.integration.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestStickyRebalanceStrategy extends ZkTestBase {
+  static final int NUM_NODE = 18;
+  static final int ADDITIONAL_NUM_NODE = 2;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 2;
+  protected static final int REPLICAS = 3;
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  protected ClusterControllerManager _controller;
+  protected List<MockParticipantManager> _participants = new ArrayList<>();
+  protected List<MockParticipantManager> _additionalParticipants = new 
ArrayList<>();
+  protected int _minActiveReplica = 0;
+  protected ZkHelixClusterVerifier _clusterVerifier;
+  protected List<String> _testDBs = new ArrayList<>();
+  protected ConfigAccessor _configAccessor;
+  protected String[] TestStateModels =
+      {BuiltInStateModelDefinitions.MasterSlave.name(), 
BuiltInStateModelDefinitions.LeaderStandby.name(), 
BuiltInStateModelDefinitions.OnlineOffline.name()};
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    for (int i = NUM_NODE; i < NUM_NODE + ADDITIONAL_NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      _additionalParticipants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
+            
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    if (_clusterVerifier != null) {
+      _clusterVerifier.close();
+    }
+    /*
+      shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  @BeforeMethod
+  public void beforeTest() {
+    // Restart any participants that has been disconnected from last test
+    for (int i = 0; i < _participants.size(); i++) {
+      if (!_participants.get(i).isConnected()) {
+        _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+            _participants.get(i).getInstanceName()));
+        _participants.get(i).syncStart();
+      }
+    }
+
+    // Stop any additional participants that has been added from last test
+    for (MockParticipantManager additionalParticipant : 
_additionalParticipants) {
+      if (additionalParticipant.isConnected()) {
+        additionalParticipant.syncStop();
+      }
+    }
+  }
+
+  @AfterMethod
+  public void afterTest() throws InterruptedException {
+    // delete all DBs create in last test
+    for (String db : _testDBs) {
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _testDBs.clear();
+    _clusterVerifier.verifyByPolling();
+  }
+
+  @Test
+  public void testFirstTimeAssignmentWithNoInitialLiveNodes() throws Exception 
{
+    setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 
1);
+    // Shut down all the nodes
+    for (int i = 0; i < NUM_NODE; i++) {
+      _participants.get(i).syncStop();
+    }
+    // Create resource
+    Map<String, ExternalView> externalViewsBefore = createTestDBs();
+    // Start all the nodes
+    for (int i = 0; i < NUM_NODE; i++) {
+      _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+          _participants.get(i).getInstanceName()));
+      _participants.get(i).syncStart();
+    }
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    Map<String, ExternalView> externalViewsAfter = new HashMap<>();
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      externalViewsAfter.put(db, ev);
+    }
+    validateAllPartitionAssigned(externalViewsAfter);
+  }
+
+  @Test
+  public void testNoPartitionMovementWithNewInstanceAdd() throws Exception {
+    setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 
1);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+    // Start more new instances
+    for (int i = 0; i < _additionalParticipants.size(); i++) {
+      _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME,
+          _additionalParticipants.get(i).getInstanceName()));
+      _additionalParticipants.get(i).syncStart();
+    }
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    // All partition assignment should remain the same
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev);
+    }
+  }
+
+  @Test
+  public void testNoPartitionMovementWithInstanceDown() throws Exception {
+    setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 
1);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+    // Shut down 2 instances
+    _participants.get(0).syncStop();
+    _participants.get(_participants.size() - 1).syncStop();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    // No movement for previous remaining assignment
+    Map<String, ExternalView> externalViewsAfter = new HashMap<>();
+    Map<String, IdealState> idealStates = new HashMap<>();
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      externalViewsAfter.put(db, ev);
+      idealStates.put(db, is);
+    }
+    validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, 
externalViewsAfter, 2);
+  }
+
+  @Test
+  public void testFirstTimeAssignmentWithStackingPlacement() throws Exception {
+    setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 
2);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs();
+    validateAllPartitionAssigned(externalViewsBefore);
+  }
+
+  @Test
+  public void testNoPartitionMovementWithNewInstanceAddWithStackingPlacement() 
throws Exception {
+    setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 
2);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+    // Start more new instances
+    for (int i = 0; i < _additionalParticipants.size(); i++) {
+      _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME,
+          _additionalParticipants.get(i).getInstanceName()));
+      _additionalParticipants.get(i).syncStart();
+    }
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    // All partition assignment should remain the same
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev);
+    }
+  }
+
+  @Test
+  public void testNoPartitionMovementWithInstanceDownWithStackingPlacement() 
throws Exception {
+    setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 
2);
+    // Shut down half of the nodes given we allow stacking placement
+    for (int i = 0; i < NUM_NODE / 2; i++) {
+      _participants.get(i).syncStop();
+    }
+    Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+    // Shut down 2 instances
+    _participants.get(_participants.size() - 1).syncStop();
+    _participants.get(_participants.size() - 2).syncStop();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    // No movement for previous remaining assignment
+    Map<String, ExternalView> externalViewsAfter = new HashMap<>();
+    Map<String, IdealState> idealStates = new HashMap<>();
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      externalViewsAfter.put(db, ev);
+      idealStates.put(db, is);
+    }
+    validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, 
externalViewsAfter, 4);
+  }
+
+  // create test DBs, wait it converged and return externalviews
+  protected Map<String, ExternalView> createTestDBs() throws 
InterruptedException {
+    Map<String, ExternalView> externalViews = new HashMap<String, 
ExternalView>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithConditionBasedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, REPLICAS,
+          _minActiveReplica, StickyRebalanceStrategy.class.getName());
+      _testDBs.add(db);
+    }
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      externalViews.put(db, ev);
+    }
+    return externalViews;
+  }
+
+  protected void validateNoPartitionMove(IdealState is, ExternalView evBefore,
+      ExternalView evAfter) {
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentsBefore = 
evBefore.getRecord().getMapField(partition);
+      Map<String, String> assignmentsAfter = 
evAfter.getRecord().getMapField(partition);
+
+      Set<String> instancesBefore = new HashSet<>(assignmentsBefore.keySet());
+      Set<String> instancesAfter = new HashSet<>(assignmentsAfter.keySet());
+
+      Assert.assertEquals(instancesBefore, instancesAfter,
+          String.format("%s has been moved to new instances, before: %s, 
after: %s", partition,
+              assignmentsBefore, assignmentsAfter));
+    }
+  }
+
+  protected void validateNoPartitionMoveWithDiffCount(Map<String, IdealState> 
idealStates,
+      Map<String, ExternalView> externalViewsBefore, Map<String, ExternalView> 
externalViewsAfter,
+      int diffCount) {
+    for (Map.Entry<String, IdealState> entry : idealStates.entrySet()) {
+      String resourceName = entry.getKey();
+      IdealState is = entry.getValue();
+      for (String partition : is.getPartitionSet()) {
+        Map<String, String> assignmentsBefore =
+            
externalViewsBefore.get(resourceName).getRecord().getMapField(partition);
+        Map<String, String> assignmentsAfter =
+            
externalViewsAfter.get(resourceName).getRecord().getMapField(partition);
+
+        Set<String> instancesBefore = new 
HashSet<>(assignmentsBefore.keySet());
+        Set<String> instancesAfter = new HashSet<>(assignmentsAfter.keySet());
+
+        if (instancesBefore.size() == instancesAfter.size()) {
+          Assert.assertEquals(instancesBefore, instancesAfter,
+              String.format("%s has been moved to new instances, before: %s, 
after: %s", partition,
+                  assignmentsBefore, assignmentsAfter));
+        } else {
+          Assert.assertTrue(instancesBefore.containsAll(instancesAfter),
+              String.format("%s has been moved to new instances, before: %s, 
after: %s", partition,
+                  assignmentsBefore, assignmentsAfter));
+          diffCount = diffCount - (instancesBefore.size() - 
instancesAfter.size());
+        }
+      }
+    }
+    Assert.assertEquals(diffCount, 0,
+        String.format("Partition movement detected, before: %s, after: %s", 
externalViewsBefore,
+            externalViewsAfter));
+  }
+
+  private void validateAllPartitionAssigned(Map<String, ExternalView> 
externalViewsBefore) {
+    for (ExternalView ev : externalViewsBefore.values()) {
+      Map<String, Map<String, String>> assignments = 
ev.getRecord().getMapFields();
+      Assert.assertNotNull(assignments);
+      Assert.assertEquals(assignments.size(), PARTITIONS);
+      for (Map<String, String> assignmentMap : assignments.values()) {
+        Assert.assertEquals(assignmentMap.keySet().size(), REPLICAS);
+      }
+    }
+  }
+}


Reply via email to