This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 1a210ef895230579023916ed48848c26930ccad1 Author: frankmu <muteng...@gmail.com> AuthorDate: Thu Aug 29 16:12:41 2024 -0700 Move existing assignments usage calculation to pre-process stage (#2888) Move existing assignments usage calculation to pre-process stage --- .../ResourceControllerDataProvider.java | 31 ++ .../rebalancer/ConditionBasedRebalancer.java | 36 +-- .../strategy/StickyRebalanceStrategy.java | 96 ++---- .../stages/CurrentStateComputationStage.java | 8 + .../java/org/apache/helix/common/ZkTestBase.java | 18 +- .../rebalancer/TestStickyRebalanceStrategy.java | 353 +++++++++++++++++++++ 6 files changed, 458 insertions(+), 84 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 74b5aa8b2..37811d9a2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -44,6 +44,7 @@ import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity; import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider; +import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.MissingTopStateRecord; import org.apache.helix.model.CustomizedState; import org.apache.helix.model.CustomizedStateConfig; @@ -51,6 +52,7 @@ import org.apache.helix.model.CustomizedView; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.slf4j.Logger; @@ -583,6 +585,35 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { return _simpleCapacitySet; } + public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet, + final CurrentStateOutput currentStateOutput) { + // Convert the assignableNodes to map for quick lookup + Map<String, CapacityNode> simpleCapacityMap = new HashMap<>(); + for (CapacityNode node : _simpleCapacitySet) { + simpleCapacityMap.put(node.getId(), node); + } + for (String resourceName : resourceNameSet) { + // Process current state mapping + populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap, + currentStateOutput.getCurrentStateMap(resourceName)); + // Process pending state mapping + populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap, + currentStateOutput.getPendingMessageMap(resourceName)); + } + } + + private <T> void populateCapacityNodeUsageFromStateMap(String resourceName, + Map<String, CapacityNode> simpleCapacityMap, Map<Partition, Map<String, T>> stateMap) { + for (Map.Entry<Partition, Map<String, T>> entry : stateMap.entrySet()) { + for (String instanceName : entry.getValue().keySet()) { + CapacityNode node = simpleCapacityMap.get(instanceName); + if (node != null) { + node.canAdd(resourceName, entry.getKey().getPartitionName()); + } + } + } + } + private void refreshDisabledInstancesForAllPartitionsSet() { _disabledInstancesForAllPartitionsSet.clear(); Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java index 84762ea51..6e68033c4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java @@ -72,16 +72,12 @@ public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControl @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) { - if (!this._rebalanceConditions.stream() + ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); + // If previous placement list exists in cache && all condition met -> return cached value + if (cachedIdealState != null && cachedIdealState.getListFields() != null + && !cachedIdealState.getListFields().isEmpty() && !this._rebalanceConditions.stream() .allMatch(condition -> condition.shouldPerformRebalance(clusterData))) { - ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); - if (cachedIdealState != null) { - return new IdealState(cachedIdealState); - } - // In theory, the cache should be populated already if no rebalance is needed - LOG.warn( - "Cannot fetch the cached Ideal State for resource: {}, will recompute the Ideal State", - resourceName); + return new IdealState(cachedIdealState); } LOG.info("Computing IdealState for " + resourceName); @@ -189,18 +185,16 @@ public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControl public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache, IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) { ZNRecord cachedIdealState = cache.getCachedOndemandIdealState(resource.getResourceName()); - if (!this._rebalanceConditions.stream() + // If previous assignment map exists in cache && all condition met -> return cached value + if (cachedIdealState.getMapFields() != null && !cachedIdealState.getMapFields().isEmpty() + && !this._rebalanceConditions.stream() .allMatch(condition -> condition.shouldPerformRebalance(cache))) { - if (cachedIdealState != null && cachedIdealState.getMapFields() != null) { - ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName()); - for (Partition partition : resource.getPartitions()) { - partitionMapping.addReplicaMap(partition, cachedIdealState.getMapFields().get(partition)); - } - return new ResourceAssignment(cachedIdealState); + ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName()); + for (Partition partition : resource.getPartitions()) { + partitionMapping.addReplicaMap(partition, + cachedIdealState.getMapFields().get(partition.getPartitionName())); } - // In theory, the cache should be populated already if no rebalance is needed - LOG.warn("Cannot fetch the cached assignment for resource: {}, will recompute the assignment", - resource.getResourceName()); + return partitionMapping; } LOG.info("Computing BestPossibleMapping for " + resource.getResourceName()); @@ -212,6 +206,10 @@ public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControl cachedIdealState.setMapFields(assignment.getRecord().getMapFields()); cache.setCachedOndemandIdealState(resource.getResourceName(), cachedIdealState); + if (LOG.isDebugEnabled()) { + LOG.debug("Processed resource: {}", resource.getResourceName()); + LOG.debug("Final Mapping of resource : {}", assignment); + } return assignment; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java index 7def42d08..3c3793cec 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java @@ -36,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> { - private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); + private static final Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); private String _resourceName; private List<String> _partitions; private LinkedHashMap<String, Integer> _states; @@ -70,52 +70,50 @@ public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceContro return znRecord; } - // Sort the assignable nodes by id - List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); - assignableNodes.sort(Comparator.comparing(CapacityNode::getId)); - // Filter out the nodes if not in the liveNodes parameter // Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags + Set<CapacityNode> assignableNodeSet = new HashSet<>(clusterData.getSimpleCapacitySet()); Set<String> liveNodesSet = new HashSet<>(liveNodes); - assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId())); + assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId())); // Populate valid state map given current mapping - Map<String, Map<String, String>> stateMap = - populateValidStateMapFromCurrentMapping(currentMapping, assignableNodes); + Map<String, Set<String>> stateMap = + populateValidAssignmentMapFromCurrentMapping(currentMapping, assignableNodeSet); if (logger.isDebugEnabled()) { logger.debug("currentMapping: {}", currentMapping); logger.debug("stateMap: {}", stateMap); } + // Sort the assignable nodes by id + List<CapacityNode> assignableNodeList = + assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId)) + .collect(Collectors.toList()); + // Assign partitions to node by order. for (int i = 0, index = 0; i < _partitions.size(); i++) { int startIndex = index; - for (Map.Entry<String, Integer> entry : _states.entrySet()) { - String state = entry.getKey(); - int stateReplicaNumber = entry.getValue(); - // For this partition, compute existing number replicas - long existsReplicas = - stateMap.computeIfAbsent(_partitions.get(i), m -> new HashMap<>()).values().stream() - .filter(s -> s.equals(state)).count(); - for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) { - while (index - startIndex < assignableNodes.size()) { - CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); - if (node.canAdd(_resourceName, _partitions.get(i))) { - stateMap.get(_partitions.get(i)).put(node.getId(), state); - break; - } + int remainingReplica = _statesReplicaCount; + if (stateMap.containsKey(_partitions.get(i))) { + remainingReplica = remainingReplica - stateMap.get(_partitions.get(i)).size(); + } + for (int j = 0; j < remainingReplica; j++) { + while (index - startIndex < assignableNodeList.size()) { + CapacityNode node = assignableNodeList.get(index++ % assignableNodeList.size()); + if (node.canAdd(_resourceName, _partitions.get(i))) { + stateMap.computeIfAbsent(_partitions.get(i), m -> new HashSet<>()).add(node.getId()); + break; } + } - if (index - startIndex >= assignableNodes.size()) { - // If the all nodes have been tried out, then no node can be assigned. - logger.warn("No enough assignable nodes for resource: {}", _resourceName); - } + if (index - startIndex >= assignableNodeList.size()) { + // If the all nodes have been tried out, then no node can be assigned. + logger.warn("No enough assignable nodes for resource: {}", _resourceName); } } } - for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) { - znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue().keySet())); + for (Map.Entry<String, Set<String>> entry : stateMap.entrySet()) { + znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue())); } if (logger.isDebugEnabled()) { logger.debug("znRecord: {}", znRecord); @@ -129,12 +127,12 @@ public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceContro * * @param currentMapping the current mapping of partitions to node states * @param assignableNodes the list of nodes that can be assigned - * @return a map of partitions to valid node states + * @return a map of partitions to valid nodes */ - private Map<String, Map<String, String>> populateValidStateMapFromCurrentMapping( + private Map<String, Set<String>> populateValidAssignmentMapFromCurrentMapping( final Map<String, Map<String, String>> currentMapping, - final List<CapacityNode> assignableNodes) { - Map<String, Map<String, String>> validStateMap = new HashMap<>(); + final Set<CapacityNode> assignableNodes) { + Map<String, Set<String>> validAssignmentMap = new HashMap<>(); // Convert the assignableNodes to map for quick lookup Map<String, CapacityNode> assignableNodeMap = assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, node -> node)); @@ -142,44 +140,14 @@ public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceContro for (Map.Entry<String, Map<String, String>> entry : currentMapping.entrySet()) { String partition = entry.getKey(); Map<String, String> currentNodeStateMap = new HashMap<>(entry.getValue()); - // Skip if current node state is invalid with state model - if (!isValidStateMap(currentNodeStateMap)) { - continue; - } // Filter out invalid node assignment currentNodeStateMap.entrySet() .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), assignableNodeMap)); - validStateMap.put(partition, currentNodeStateMap); - } - } - return validStateMap; - } - - /** - * Validates whether the provided state mapping is valid according to the defined state model. - * - * @param currentNodeStateMap A map representing the actual state mapping where the key is the node ID and the value is the state. - * @return true if the state map is valid, false otherwise - */ - private boolean isValidStateMap(final Map<String, String> currentNodeStateMap) { - // Check if the size of the current state map exceeds the total state count in state model - if (currentNodeStateMap.size() > _statesReplicaCount) { - return false; - } - - Map<String, Integer> tmpStates = new HashMap<>(_states); - for (String state : currentNodeStateMap.values()) { - // Return invalid if: - // The state is not defined in the state model OR - // The state count exceeds the defined count in state model - if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) { - return false; + validAssignmentMap.put(partition, new HashSet<>(currentNodeStateMap.keySet())); } - tmpStates.put(state, tmpStates.get(state) - 1); } - - return true; + return validAssignmentMap; } /** diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index da972d682..64d113f0a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -134,6 +134,14 @@ public class CurrentStateComputationStage extends AbstractBaseStage { handleResourceCapacityCalculation(event, (ResourceControllerDataProvider) cache, currentStateOutput); } + + // Populate the capacity for simple CapacityNode + if (cache.getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1 + && cache instanceof ResourceControllerDataProvider) { + final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache; + dataProvider.populateSimpleCapacitySetUsage(resourceToRebalance.keySet(), + currentStateExcludingUnknown); + } } // update all pending messages to CurrentStateOutput. diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index a26560518..28fa8d491 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -19,7 +19,6 @@ package org.apache.helix.common; * under the License. */ -import com.google.common.base.Preconditions; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.reflect.Method; @@ -33,6 +32,7 @@ import java.util.logging.Level; import javax.management.MBeanServerConnection; import javax.management.ObjectName; +import com.google.common.base.Preconditions; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; @@ -50,6 +50,7 @@ import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; +import org.apache.helix.controller.rebalancer.ConditionBasedRebalancer; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; @@ -365,6 +366,14 @@ public class ZkTestBase { configAccessor.setClusterConfig(clusterName, clusterConfig); } + protected void setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient, + String clusterName, int maxPartitionAllowed) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, @@ -384,6 +393,13 @@ public class ZkTestBase { -1, WagedRebalancer.class.getName(), null); } + protected IdealState createResourceWithConditionBasedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, + String rebalanceStrategy) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, -1, + ConditionBasedRebalancer.class.getName(), rebalanceStrategy); + } + private IdealState createResource(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay, String rebalancerClassName, String rebalanceStrategy) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java new file mode 100644 index 000000000..4e4717f39 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java @@ -0,0 +1,353 @@ +package org.apache.helix.integration.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TestStickyRebalanceStrategy extends ZkTestBase { + static final int NUM_NODE = 18; + static final int ADDITIONAL_NUM_NODE = 2; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 2; + protected static final int REPLICAS = 3; + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + protected ClusterControllerManager _controller; + protected List<MockParticipantManager> _participants = new ArrayList<>(); + protected List<MockParticipantManager> _additionalParticipants = new ArrayList<>(); + protected int _minActiveReplica = 0; + protected ZkHelixClusterVerifier _clusterVerifier; + protected List<String> _testDBs = new ArrayList<>(); + protected ConfigAccessor _configAccessor; + protected String[] TestStateModels = + {BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name(), BuiltInStateModelDefinitions.OnlineOffline.name()}; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + // start dummy participants + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + participant.syncStart(); + _participants.add(participant); + } + + for (int i = NUM_NODE; i < NUM_NODE + ADDITIONAL_NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + _additionalParticipants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + } + + @AfterClass + public void afterClass() throws Exception { + if (_clusterVerifier != null) { + _clusterVerifier.close(); + } + /* + shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } + + @BeforeMethod + public void beforeTest() { + // Restart any participants that has been disconnected from last test + for (int i = 0; i < _participants.size(); i++) { + if (!_participants.get(i).isConnected()) { + _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _participants.get(i).getInstanceName())); + _participants.get(i).syncStart(); + } + } + + // Stop any additional participants that has been added from last test + for (MockParticipantManager additionalParticipant : _additionalParticipants) { + if (additionalParticipant.isConnected()) { + additionalParticipant.syncStop(); + } + } + } + + @AfterMethod + public void afterTest() throws InterruptedException { + // delete all DBs create in last test + for (String db : _testDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _testDBs.clear(); + _clusterVerifier.verifyByPolling(); + } + + @Test + public void testFirstTimeAssignmentWithNoInitialLiveNodes() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1); + // Shut down all the nodes + for (int i = 0; i < NUM_NODE; i++) { + _participants.get(i).syncStop(); + } + // Create resource + Map<String, ExternalView> externalViewsBefore = createTestDBs(); + // Start all the nodes + for (int i = 0; i < NUM_NODE; i++) { + _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _participants.get(i).getInstanceName())); + _participants.get(i).syncStart(); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + Map<String, ExternalView> externalViewsAfter = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViewsAfter.put(db, ev); + } + validateAllPartitionAssigned(externalViewsAfter); + } + + @Test + public void testNoPartitionMovementWithNewInstanceAdd() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1); + Map<String, ExternalView> externalViewsBefore = createTestDBs(); + + // Start more new instances + for (int i = 0; i < _additionalParticipants.size(); i++) { + _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _additionalParticipants.get(i).getInstanceName())); + _additionalParticipants.get(i).syncStart(); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // All partition assignment should remain the same + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev); + } + } + + @Test + public void testNoPartitionMovementWithInstanceDown() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1); + Map<String, ExternalView> externalViewsBefore = createTestDBs(); + + // Shut down 2 instances + _participants.get(0).syncStop(); + _participants.get(_participants.size() - 1).syncStop(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // No movement for previous remaining assignment + Map<String, ExternalView> externalViewsAfter = new HashMap<>(); + Map<String, IdealState> idealStates = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + externalViewsAfter.put(db, ev); + idealStates.put(db, is); + } + validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, externalViewsAfter, 2); + } + + @Test + public void testFirstTimeAssignmentWithStackingPlacement() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2); + Map<String, ExternalView> externalViewsBefore = createTestDBs(); + validateAllPartitionAssigned(externalViewsBefore); + } + + @Test + public void testNoPartitionMovementWithNewInstanceAddWithStackingPlacement() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2); + Map<String, ExternalView> externalViewsBefore = createTestDBs(); + + // Start more new instances + for (int i = 0; i < _additionalParticipants.size(); i++) { + _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _additionalParticipants.get(i).getInstanceName())); + _additionalParticipants.get(i).syncStart(); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // All partition assignment should remain the same + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev); + } + } + + @Test + public void testNoPartitionMovementWithInstanceDownWithStackingPlacement() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2); + // Shut down half of the nodes given we allow stacking placement + for (int i = 0; i < NUM_NODE / 2; i++) { + _participants.get(i).syncStop(); + } + Map<String, ExternalView> externalViewsBefore = createTestDBs(); + + // Shut down 2 instances + _participants.get(_participants.size() - 1).syncStop(); + _participants.get(_participants.size() - 2).syncStop(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // No movement for previous remaining assignment + Map<String, ExternalView> externalViewsAfter = new HashMap<>(); + Map<String, IdealState> idealStates = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + externalViewsAfter.put(db, ev); + idealStates.put(db, is); + } + validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, externalViewsAfter, 4); + } + + // create test DBs, wait it converged and return externalviews + protected Map<String, ExternalView> createTestDBs() throws InterruptedException { + Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithConditionBasedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, REPLICAS, + _minActiveReplica, StickyRebalanceStrategy.class.getName()); + _testDBs.add(db); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, + ExternalView evAfter) { + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentsBefore = evBefore.getRecord().getMapField(partition); + Map<String, String> assignmentsAfter = evAfter.getRecord().getMapField(partition); + + Set<String> instancesBefore = new HashSet<>(assignmentsBefore.keySet()); + Set<String> instancesAfter = new HashSet<>(assignmentsAfter.keySet()); + + Assert.assertEquals(instancesBefore, instancesAfter, + String.format("%s has been moved to new instances, before: %s, after: %s", partition, + assignmentsBefore, assignmentsAfter)); + } + } + + protected void validateNoPartitionMoveWithDiffCount(Map<String, IdealState> idealStates, + Map<String, ExternalView> externalViewsBefore, Map<String, ExternalView> externalViewsAfter, + int diffCount) { + for (Map.Entry<String, IdealState> entry : idealStates.entrySet()) { + String resourceName = entry.getKey(); + IdealState is = entry.getValue(); + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentsBefore = + externalViewsBefore.get(resourceName).getRecord().getMapField(partition); + Map<String, String> assignmentsAfter = + externalViewsAfter.get(resourceName).getRecord().getMapField(partition); + + Set<String> instancesBefore = new HashSet<>(assignmentsBefore.keySet()); + Set<String> instancesAfter = new HashSet<>(assignmentsAfter.keySet()); + + if (instancesBefore.size() == instancesAfter.size()) { + Assert.assertEquals(instancesBefore, instancesAfter, + String.format("%s has been moved to new instances, before: %s, after: %s", partition, + assignmentsBefore, assignmentsAfter)); + } else { + Assert.assertTrue(instancesBefore.containsAll(instancesAfter), + String.format("%s has been moved to new instances, before: %s, after: %s", partition, + assignmentsBefore, assignmentsAfter)); + diffCount = diffCount - (instancesBefore.size() - instancesAfter.size()); + } + } + } + Assert.assertEquals(diffCount, 0, + String.format("Partition movement detected, before: %s, after: %s", externalViewsBefore, + externalViewsAfter)); + } + + private void validateAllPartitionAssigned(Map<String, ExternalView> externalViewsBefore) { + for (ExternalView ev : externalViewsBefore.values()) { + Map<String, Map<String, String>> assignments = ev.getRecord().getMapFields(); + Assert.assertNotNull(assignments); + Assert.assertEquals(assignments.size(), PARTITIONS); + for (Map<String, String> assignmentMap : assignments.values()) { + Assert.assertEquals(assignmentMap.keySet().size(), REPLICAS); + } + } + } +}