This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch helix-stickiness-rebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-stickiness-rebalancer by
this push:
new ca6426688 Move existing assignments usage calculation to pre-process
stage (#2888)
ca6426688 is described below
commit ca64266887fdb3260ca03cfccb7683287c0a80ae
Author: frankmu <[email protected]>
AuthorDate: Thu Aug 29 16:12:41 2024 -0700
Move existing assignments usage calculation to pre-process stage (#2888)
Move existing assignments usage calculation to pre-process stage
---
.../ResourceControllerDataProvider.java | 31 ++
.../rebalancer/ConditionBasedRebalancer.java | 36 +--
.../strategy/StickyRebalanceStrategy.java | 96 ++----
.../stages/CurrentStateComputationStage.java | 8 +
.../java/org/apache/helix/common/ZkTestBase.java | 18 +-
.../rebalancer/TestStickyRebalanceStrategy.java | 353 +++++++++++++++++++++
6 files changed, 458 insertions(+), 84 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 74b5aa8b2..37811d9a2 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -44,6 +44,7 @@ import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.MissingTopStateRecord;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
@@ -51,6 +52,7 @@ import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
@@ -583,6 +585,35 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
return _simpleCapacitySet;
}
+ public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet,
+ final CurrentStateOutput currentStateOutput) {
+ // Convert the assignableNodes to map for quick lookup
+ Map<String, CapacityNode> simpleCapacityMap = new HashMap<>();
+ for (CapacityNode node : _simpleCapacitySet) {
+ simpleCapacityMap.put(node.getId(), node);
+ }
+ for (String resourceName : resourceNameSet) {
+ // Process current state mapping
+ populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
+ currentStateOutput.getCurrentStateMap(resourceName));
+ // Process pending state mapping
+ populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
+ currentStateOutput.getPendingMessageMap(resourceName));
+ }
+ }
+
+ private <T> void populateCapacityNodeUsageFromStateMap(String resourceName,
+ Map<String, CapacityNode> simpleCapacityMap, Map<Partition, Map<String,
T>> stateMap) {
+ for (Map.Entry<Partition, Map<String, T>> entry : stateMap.entrySet()) {
+ for (String instanceName : entry.getValue().keySet()) {
+ CapacityNode node = simpleCapacityMap.get(instanceName);
+ if (node != null) {
+ node.canAdd(resourceName, entry.getKey().getPartitionName());
+ }
+ }
+ }
+ }
+
private void refreshDisabledInstancesForAllPartitionsSet() {
_disabledInstancesForAllPartitionsSet.clear();
Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values();
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
index 84762ea51..6e68033c4 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
@@ -72,16 +72,12 @@ public class ConditionBasedRebalancer extends
AbstractRebalancer<ResourceControl
@Override
public IdealState computeNewIdealState(String resourceName, IdealState
currentIdealState,
CurrentStateOutput currentStateOutput, ResourceControllerDataProvider
clusterData) {
- if (!this._rebalanceConditions.stream()
+ ZNRecord cachedIdealState =
clusterData.getCachedOndemandIdealState(resourceName);
+ // If previous placement list exists in cache && all condition met ->
return cached value
+ if (cachedIdealState != null && cachedIdealState.getListFields() != null
+ && !cachedIdealState.getListFields().isEmpty() &&
!this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(clusterData)))
{
- ZNRecord cachedIdealState =
clusterData.getCachedOndemandIdealState(resourceName);
- if (cachedIdealState != null) {
- return new IdealState(cachedIdealState);
- }
- // In theory, the cache should be populated already if no rebalance is
needed
- LOG.warn(
- "Cannot fetch the cached Ideal State for resource: {}, will
recompute the Ideal State",
- resourceName);
+ return new IdealState(cachedIdealState);
}
LOG.info("Computing IdealState for " + resourceName);
@@ -189,18 +185,16 @@ public class ConditionBasedRebalancer extends
AbstractRebalancer<ResourceControl
public ResourceAssignment
computeBestPossiblePartitionState(ResourceControllerDataProvider cache,
IdealState idealState, Resource resource, CurrentStateOutput
currentStateOutput) {
ZNRecord cachedIdealState =
cache.getCachedOndemandIdealState(resource.getResourceName());
- if (!this._rebalanceConditions.stream()
+ // If previous assignment map exists in cache && all condition met ->
return cached value
+ if (cachedIdealState.getMapFields() != null &&
!cachedIdealState.getMapFields().isEmpty()
+ && !this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(cache))) {
- if (cachedIdealState != null && cachedIdealState.getMapFields() != null)
{
- ResourceAssignment partitionMapping = new
ResourceAssignment(resource.getResourceName());
- for (Partition partition : resource.getPartitions()) {
- partitionMapping.addReplicaMap(partition,
cachedIdealState.getMapFields().get(partition));
- }
- return new ResourceAssignment(cachedIdealState);
+ ResourceAssignment partitionMapping = new
ResourceAssignment(resource.getResourceName());
+ for (Partition partition : resource.getPartitions()) {
+ partitionMapping.addReplicaMap(partition,
+ cachedIdealState.getMapFields().get(partition.getPartitionName()));
}
- // In theory, the cache should be populated already if no rebalance is
needed
- LOG.warn("Cannot fetch the cached assignment for resource: {}, will
recompute the assignment",
- resource.getResourceName());
+ return partitionMapping;
}
LOG.info("Computing BestPossibleMapping for " +
resource.getResourceName());
@@ -212,6 +206,10 @@ public class ConditionBasedRebalancer extends
AbstractRebalancer<ResourceControl
cachedIdealState.setMapFields(assignment.getRecord().getMapFields());
cache.setCachedOndemandIdealState(resource.getResourceName(),
cachedIdealState);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processed resource: {}", resource.getResourceName());
+ LOG.debug("Final Mapping of resource : {}", assignment);
+ }
return assignment;
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
index 7def42d08..3c3793cec 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceControllerDataProvider> {
- private static Logger logger =
LoggerFactory.getLogger(StickyRebalanceStrategy.class);
+ private static final Logger logger =
LoggerFactory.getLogger(StickyRebalanceStrategy.class);
private String _resourceName;
private List<String> _partitions;
private LinkedHashMap<String, Integer> _states;
@@ -70,52 +70,50 @@ public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceContro
return znRecord;
}
- // Sort the assignable nodes by id
- List<CapacityNode> assignableNodes = new
ArrayList<>(clusterData.getSimpleCapacitySet());
- assignableNodes.sort(Comparator.comparing(CapacityNode::getId));
-
// Filter out the nodes if not in the liveNodes parameter
// Note the liveNodes parameter here might be processed within the
rebalancer, e.g. filter based on tags
+ Set<CapacityNode> assignableNodeSet = new
HashSet<>(clusterData.getSimpleCapacitySet());
Set<String> liveNodesSet = new HashSet<>(liveNodes);
- assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId()));
+ assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId()));
// Populate valid state map given current mapping
- Map<String, Map<String, String>> stateMap =
- populateValidStateMapFromCurrentMapping(currentMapping,
assignableNodes);
+ Map<String, Set<String>> stateMap =
+ populateValidAssignmentMapFromCurrentMapping(currentMapping,
assignableNodeSet);
if (logger.isDebugEnabled()) {
logger.debug("currentMapping: {}", currentMapping);
logger.debug("stateMap: {}", stateMap);
}
+ // Sort the assignable nodes by id
+ List<CapacityNode> assignableNodeList =
+
assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId))
+ .collect(Collectors.toList());
+
// Assign partitions to node by order.
for (int i = 0, index = 0; i < _partitions.size(); i++) {
int startIndex = index;
- for (Map.Entry<String, Integer> entry : _states.entrySet()) {
- String state = entry.getKey();
- int stateReplicaNumber = entry.getValue();
- // For this partition, compute existing number replicas
- long existsReplicas =
- stateMap.computeIfAbsent(_partitions.get(i), m -> new
HashMap<>()).values().stream()
- .filter(s -> s.equals(state)).count();
- for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) {
- while (index - startIndex < assignableNodes.size()) {
- CapacityNode node = assignableNodes.get(index++ %
assignableNodes.size());
- if (node.canAdd(_resourceName, _partitions.get(i))) {
- stateMap.get(_partitions.get(i)).put(node.getId(), state);
- break;
- }
+ int remainingReplica = _statesReplicaCount;
+ if (stateMap.containsKey(_partitions.get(i))) {
+ remainingReplica = remainingReplica -
stateMap.get(_partitions.get(i)).size();
+ }
+ for (int j = 0; j < remainingReplica; j++) {
+ while (index - startIndex < assignableNodeList.size()) {
+ CapacityNode node = assignableNodeList.get(index++ %
assignableNodeList.size());
+ if (node.canAdd(_resourceName, _partitions.get(i))) {
+ stateMap.computeIfAbsent(_partitions.get(i), m -> new
HashSet<>()).add(node.getId());
+ break;
}
+ }
- if (index - startIndex >= assignableNodes.size()) {
- // If the all nodes have been tried out, then no node can be
assigned.
- logger.warn("No enough assignable nodes for resource: {}",
_resourceName);
- }
+ if (index - startIndex >= assignableNodeList.size()) {
+ // If the all nodes have been tried out, then no node can be
assigned.
+ logger.warn("No enough assignable nodes for resource: {}",
_resourceName);
}
}
}
- for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) {
- znRecord.setListField(entry.getKey(), new
ArrayList<>(entry.getValue().keySet()));
+ for (Map.Entry<String, Set<String>> entry : stateMap.entrySet()) {
+ znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue()));
}
if (logger.isDebugEnabled()) {
logger.debug("znRecord: {}", znRecord);
@@ -129,12 +127,12 @@ public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceContro
*
* @param currentMapping the current mapping of partitions to node states
* @param assignableNodes the list of nodes that can be assigned
- * @return a map of partitions to valid node states
+ * @return a map of partitions to valid nodes
*/
- private Map<String, Map<String, String>>
populateValidStateMapFromCurrentMapping(
+ private Map<String, Set<String>>
populateValidAssignmentMapFromCurrentMapping(
final Map<String, Map<String, String>> currentMapping,
- final List<CapacityNode> assignableNodes) {
- Map<String, Map<String, String>> validStateMap = new HashMap<>();
+ final Set<CapacityNode> assignableNodes) {
+ Map<String, Set<String>> validAssignmentMap = new HashMap<>();
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> assignableNodeMap =
assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId,
node -> node));
@@ -142,44 +140,14 @@ public class StickyRebalanceStrategy implements
RebalanceStrategy<ResourceContro
for (Map.Entry<String, Map<String, String>> entry :
currentMapping.entrySet()) {
String partition = entry.getKey();
Map<String, String> currentNodeStateMap = new
HashMap<>(entry.getValue());
- // Skip if current node state is invalid with state model
- if (!isValidStateMap(currentNodeStateMap)) {
- continue;
- }
// Filter out invalid node assignment
currentNodeStateMap.entrySet()
.removeIf(e -> !isValidNodeAssignment(partition, e.getKey(),
assignableNodeMap));
- validStateMap.put(partition, currentNodeStateMap);
- }
- }
- return validStateMap;
- }
-
- /**
- * Validates whether the provided state mapping is valid according to the
defined state model.
- *
- * @param currentNodeStateMap A map representing the actual state mapping
where the key is the node ID and the value is the state.
- * @return true if the state map is valid, false otherwise
- */
- private boolean isValidStateMap(final Map<String, String>
currentNodeStateMap) {
- // Check if the size of the current state map exceeds the total state
count in state model
- if (currentNodeStateMap.size() > _statesReplicaCount) {
- return false;
- }
-
- Map<String, Integer> tmpStates = new HashMap<>(_states);
- for (String state : currentNodeStateMap.values()) {
- // Return invalid if:
- // The state is not defined in the state model OR
- // The state count exceeds the defined count in state model
- if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) {
- return false;
+ validAssignmentMap.put(partition, new
HashSet<>(currentNodeStateMap.keySet()));
}
- tmpStates.put(state, tmpStates.get(state) - 1);
}
-
- return true;
+ return validAssignmentMap;
}
/**
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index da972d682..64d113f0a 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -134,6 +134,14 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
handleResourceCapacityCalculation(event,
(ResourceControllerDataProvider) cache, currentStateOutput);
}
+
+ // Populate the capacity for simple CapacityNode
+ if (cache.getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() !=
-1
+ && cache instanceof ResourceControllerDataProvider) {
+ final ResourceControllerDataProvider dataProvider =
(ResourceControllerDataProvider) cache;
+ dataProvider.populateSimpleCapacitySetUsage(resourceToRebalance.keySet(),
+ currentStateExcludingUnknown);
+ }
}
// update all pending messages to CurrentStateOutput.
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index a26560518..28fa8d491 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -19,7 +19,6 @@ package org.apache.helix.common;
* under the License.
*/
-import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
@@ -33,6 +32,7 @@ import java.util.logging.Level;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
+import com.google.common.base.Preconditions;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
@@ -50,6 +50,7 @@ import
org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.ConditionBasedRebalancer;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -365,6 +366,14 @@ public class ZkTestBase {
configAccessor.setClusterConfig(clusterName, clusterConfig);
}
+ protected void
setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient,
+ String clusterName, int maxPartitionAllowed) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
protected IdealState createResourceWithDelayedRebalance(String clusterName,
String db,
String stateModel, int numPartition, int replica, int minActiveReplica,
long delay) {
return createResourceWithDelayedRebalance(clusterName, db, stateModel,
numPartition, replica,
@@ -384,6 +393,13 @@ public class ZkTestBase {
-1, WagedRebalancer.class.getName(), null);
}
+ protected IdealState createResourceWithConditionBasedRebalance(String
clusterName, String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica,
+ String rebalanceStrategy) {
+ return createResource(clusterName, db, stateModel, numPartition, replica,
minActiveReplica, -1,
+ ConditionBasedRebalancer.class.getName(), rebalanceStrategy);
+ }
+
private IdealState createResource(String clusterName, String db, String
stateModel,
int numPartition, int replica, int minActiveReplica, long delay, String
rebalancerClassName,
String rebalanceStrategy) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
new file mode 100644
index 000000000..4e4717f39
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java
@@ -0,0 +1,353 @@
+package org.apache.helix.integration.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestStickyRebalanceStrategy extends ZkTestBase {
+ static final int NUM_NODE = 18;
+ static final int ADDITIONAL_NUM_NODE = 2;
+ protected static final int START_PORT = 12918;
+ protected static final int PARTITIONS = 2;
+ protected static final int REPLICAS = 3;
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ protected ClusterControllerManager _controller;
+ protected List<MockParticipantManager> _participants = new ArrayList<>();
+ protected List<MockParticipantManager> _additionalParticipants = new
ArrayList<>();
+ protected int _minActiveReplica = 0;
+ protected ZkHelixClusterVerifier _clusterVerifier;
+ protected List<String> _testDBs = new ArrayList<>();
+ protected ConfigAccessor _configAccessor;
+ protected String[] TestStateModels =
+ {BuiltInStateModelDefinitions.MasterSlave.name(),
BuiltInStateModelDefinitions.LeaderStandby.name(),
BuiltInStateModelDefinitions.OnlineOffline.name()};
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+ // start dummy participants
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ for (int i = NUM_NODE; i < NUM_NODE + ADDITIONAL_NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ _additionalParticipants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ _clusterVerifier =
+ new
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ if (_clusterVerifier != null) {
+ _clusterVerifier.close();
+ }
+ /*
+ shutdown order: 1) disconnect the controller 2) disconnect participants
+ */
+ _controller.syncStop();
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
+ }
+ deleteCluster(CLUSTER_NAME);
+ System.out.println("END " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+ }
+
+ @BeforeMethod
+ public void beforeTest() {
+ // Restart any participants that has been disconnected from last test
+ for (int i = 0; i < _participants.size(); i++) {
+ if (!_participants.get(i).isConnected()) {
+ _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+ _participants.get(i).getInstanceName()));
+ _participants.get(i).syncStart();
+ }
+ }
+
+ // Stop any additional participants that has been added from last test
+ for (MockParticipantManager additionalParticipant :
_additionalParticipants) {
+ if (additionalParticipant.isConnected()) {
+ additionalParticipant.syncStop();
+ }
+ }
+ }
+
+ @AfterMethod
+ public void afterTest() throws InterruptedException {
+ // delete all DBs create in last test
+ for (String db : _testDBs) {
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+ }
+ _testDBs.clear();
+ _clusterVerifier.verifyByPolling();
+ }
+
+ @Test
+ public void testFirstTimeAssignmentWithNoInitialLiveNodes() throws Exception
{
+ setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
1);
+ // Shut down all the nodes
+ for (int i = 0; i < NUM_NODE; i++) {
+ _participants.get(i).syncStop();
+ }
+ // Create resource
+ Map<String, ExternalView> externalViewsBefore = createTestDBs();
+ // Start all the nodes
+ for (int i = 0; i < NUM_NODE; i++) {
+ _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+ _participants.get(i).getInstanceName()));
+ _participants.get(i).syncStart();
+ }
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ Map<String, ExternalView> externalViewsAfter = new HashMap<>();
+ for (String db : _testDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ externalViewsAfter.put(db, ev);
+ }
+ validateAllPartitionAssigned(externalViewsAfter);
+ }
+
+ @Test
+ public void testNoPartitionMovementWithNewInstanceAdd() throws Exception {
+ setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
1);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+ // Start more new instances
+ for (int i = 0; i < _additionalParticipants.size(); i++) {
+ _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME,
+ _additionalParticipants.get(i).getInstanceName()));
+ _additionalParticipants.get(i).syncStart();
+ }
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ // All partition assignment should remain the same
+ for (String db : _testDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ validateNoPartitionMove(is, externalViewsBefore.get(db), ev);
+ }
+ }
+
+ @Test
+ public void testNoPartitionMovementWithInstanceDown() throws Exception {
+ setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
1);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+ // Shut down 2 instances
+ _participants.get(0).syncStop();
+ _participants.get(_participants.size() - 1).syncStop();
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ // No movement for previous remaining assignment
+ Map<String, ExternalView> externalViewsAfter = new HashMap<>();
+ Map<String, IdealState> idealStates = new HashMap<>();
+ for (String db : _testDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ externalViewsAfter.put(db, ev);
+ idealStates.put(db, is);
+ }
+ validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore,
externalViewsAfter, 2);
+ }
+
+ @Test
+ public void testFirstTimeAssignmentWithStackingPlacement() throws Exception {
+ setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
2);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs();
+ validateAllPartitionAssigned(externalViewsBefore);
+ }
+
+ @Test
+ public void testNoPartitionMovementWithNewInstanceAddWithStackingPlacement()
throws Exception {
+ setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
2);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+ // Start more new instances
+ for (int i = 0; i < _additionalParticipants.size(); i++) {
+ _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME,
+ _additionalParticipants.get(i).getInstanceName()));
+ _additionalParticipants.get(i).syncStart();
+ }
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ // All partition assignment should remain the same
+ for (String db : _testDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ validateNoPartitionMove(is, externalViewsBefore.get(db), ev);
+ }
+ }
+
+ @Test
+ public void testNoPartitionMovementWithInstanceDownWithStackingPlacement()
throws Exception {
+ setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME,
2);
+ // Shut down half of the nodes given we allow stacking placement
+ for (int i = 0; i < NUM_NODE / 2; i++) {
+ _participants.get(i).syncStop();
+ }
+ Map<String, ExternalView> externalViewsBefore = createTestDBs();
+
+ // Shut down 2 instances
+ _participants.get(_participants.size() - 1).syncStop();
+ _participants.get(_participants.size() - 2).syncStop();
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ // No movement for previous remaining assignment
+ Map<String, ExternalView> externalViewsAfter = new HashMap<>();
+ Map<String, IdealState> idealStates = new HashMap<>();
+ for (String db : _testDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ externalViewsAfter.put(db, ev);
+ idealStates.put(db, is);
+ }
+ validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore,
externalViewsAfter, 4);
+ }
+
+ // create test DBs, wait it converged and return externalviews
+ protected Map<String, ExternalView> createTestDBs() throws
InterruptedException {
+ Map<String, ExternalView> externalViews = new HashMap<String,
ExternalView>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithConditionBasedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, REPLICAS,
+ _minActiveReplica, StickyRebalanceStrategy.class.getName());
+ _testDBs.add(db);
+ }
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ for (String db : _testDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ externalViews.put(db, ev);
+ }
+ return externalViews;
+ }
+
+ protected void validateNoPartitionMove(IdealState is, ExternalView evBefore,
+ ExternalView evAfter) {
+ for (String partition : is.getPartitionSet()) {
+ Map<String, String> assignmentsBefore =
evBefore.getRecord().getMapField(partition);
+ Map<String, String> assignmentsAfter =
evAfter.getRecord().getMapField(partition);
+
+ Set<String> instancesBefore = new HashSet<>(assignmentsBefore.keySet());
+ Set<String> instancesAfter = new HashSet<>(assignmentsAfter.keySet());
+
+ Assert.assertEquals(instancesBefore, instancesAfter,
+ String.format("%s has been moved to new instances, before: %s,
after: %s", partition,
+ assignmentsBefore, assignmentsAfter));
+ }
+ }
+
+ protected void validateNoPartitionMoveWithDiffCount(Map<String, IdealState>
idealStates,
+ Map<String, ExternalView> externalViewsBefore, Map<String, ExternalView>
externalViewsAfter,
+ int diffCount) {
+ for (Map.Entry<String, IdealState> entry : idealStates.entrySet()) {
+ String resourceName = entry.getKey();
+ IdealState is = entry.getValue();
+ for (String partition : is.getPartitionSet()) {
+ Map<String, String> assignmentsBefore =
+
externalViewsBefore.get(resourceName).getRecord().getMapField(partition);
+ Map<String, String> assignmentsAfter =
+
externalViewsAfter.get(resourceName).getRecord().getMapField(partition);
+
+ Set<String> instancesBefore = new
HashSet<>(assignmentsBefore.keySet());
+ Set<String> instancesAfter = new HashSet<>(assignmentsAfter.keySet());
+
+ if (instancesBefore.size() == instancesAfter.size()) {
+ Assert.assertEquals(instancesBefore, instancesAfter,
+ String.format("%s has been moved to new instances, before: %s,
after: %s", partition,
+ assignmentsBefore, assignmentsAfter));
+ } else {
+ Assert.assertTrue(instancesBefore.containsAll(instancesAfter),
+ String.format("%s has been moved to new instances, before: %s,
after: %s", partition,
+ assignmentsBefore, assignmentsAfter));
+ diffCount = diffCount - (instancesBefore.size() -
instancesAfter.size());
+ }
+ }
+ }
+ Assert.assertEquals(diffCount, 0,
+ String.format("Partition movement detected, before: %s, after: %s",
externalViewsBefore,
+ externalViewsAfter));
+ }
+
+ private void validateAllPartitionAssigned(Map<String, ExternalView>
externalViewsBefore) {
+ for (ExternalView ev : externalViewsBefore.values()) {
+ Map<String, Map<String, String>> assignments =
ev.getRecord().getMapFields();
+ Assert.assertNotNull(assignments);
+ Assert.assertEquals(assignments.size(), PARTITIONS);
+ for (Map<String, String> assignmentMap : assignments.values()) {
+ Assert.assertEquals(assignmentMap.keySet().size(), REPLICAS);
+ }
+ }
+ }
+}