Repository: helix Updated Branches: refs/heads/master 3cf29010c -> d5bf3ad41
Generalize topology domain to support graceful node swap. There was a constraint that topology domain has to contain the instance name as the final path value. This constraint prevent configuring flexilbe topology for a dynamic cluster. In case of a node swap, the topology has to be completely re-calculated. This change decouple the domain from instance name. So even with some nodes swapped, admin can still configure with the same topology. This will ensure a stable resource partition assignment. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5bf3ad4 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5bf3ad4 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5bf3ad4 Branch: refs/heads/master Commit: d5bf3ad410e0a54b1e79d9570d3e7897c0e2c947 Parents: 3cf2901 Author: Jiajun Wang <[email protected]> Authored: Thu Nov 1 16:28:23 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Tue Nov 13 16:52:40 2018 -0800 ---------------------------------------------------------------------- .../controller/rebalancer/AutoRebalancer.java | 4 +- ...stractEvenDistributionRebalanceStrategy.java | 145 +++++++++---- .../strategy/ConstraintRebalanceStrategy.java | 31 ++- .../strategy/CrushRebalanceStrategy.java | 48 +++-- .../MultiRoundCrushRebalanceStrategy.java | 64 +++--- .../strategy/crushMapping/CardDealer.java | 8 - .../CardDealingAdjustmentAlgorithm.java | 212 ------------------ .../CardDealingAdjustmentAlgorithmV2.java | 108 ++++++---- .../ConsistentHashingAdjustmentAlgorithm.java | 76 ++++--- .../rebalancer/topology/InstanceNode.java | 63 ++++++ .../controller/rebalancer/topology/Node.java | 14 +- .../rebalancer/topology/Topology.java | 104 ++++----- .../controller/stages/ClusterDataCache.java | 2 +- .../CrushRebalancers/TestNodeSwap.java | 213 +++++++++++++++++++ 14 files changed, 636 insertions(+), 456 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index 1af159d..d04c301 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -76,8 +76,8 @@ public class AutoRebalancer extends AbstractRebalancer { LinkedHashMap<String, Integer> stateCountMap = stateModelDef .getStateCountMap(liveInstance.size(), replicas); - List<String> liveNodes = new ArrayList<String>(liveInstance.keySet()); - List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet()); + List<String> liveNodes = new ArrayList<>(liveInstance.keySet()); + List<String> allNodes = new ArrayList<>(clusterData.getAllInstances()); allNodes.removeAll(clusterData.getDisabledInstances()); liveNodes.retainAll(allNodes); http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java index 1b1dfd6..e57dc88 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java @@ -19,19 +19,29 @@ package org.apache.helix.controller.rebalancer.strategy; * under the License. */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; -import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealer; -import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithm; +import org.apache.helix.controller.LogUtil; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithmV2; import org.apache.helix.controller.rebalancer.strategy.crushMapping.ConsistentHashingAdjustmentAlgorithm; +import org.apache.helix.controller.rebalancer.topology.InstanceNode; +import org.apache.helix.controller.rebalancer.topology.Node; import org.apache.helix.controller.rebalancer.topology.Topology; import org.apache.helix.controller.stages.ClusterDataCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.ExecutionException; - /** * Abstract class of Forced Even Assignment Patched Algorithm. * This class contains common logic that re-calculate assignment based on a result calculated by the base algorithm. @@ -45,9 +55,10 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal protected abstract RebalanceStrategy getBaseRebalanceStrategy(); - protected CardDealer getCardDealingAlgorithm(Topology topology) { + protected CardDealingAdjustmentAlgorithmV2 getCardDealingAlgorithm(Topology topology) { // by default, minimize the movement when calculating for evenness. - return new CardDealingAdjustmentAlgorithm(topology, _replica); + return new CardDealingAdjustmentAlgorithmV2(topology, _replica, + CardDealingAdjustmentAlgorithmV2.Mode.MINIMIZE_MOVEMENT); } @Override @@ -78,18 +89,19 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal .computePartitionAssignment(allNodes, allNodes, currentMapping, clusterData); Map<String, List<String>> origPartitionMap = origAssignment.getListFields(); + // For logging only + String eventId = clusterData.getEventId(); + // Try to re-assign if the original map is not empty if (!origPartitionMap.isEmpty()) { - // Transform current assignment to instance->partitions map, and get total partitions - Map<String, List<String>> nodeToPartitionMap = convertMap(origPartitionMap); - - Map<String, List<String>> finalPartitionMap = null; - - // Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution. + Map<String, List<Node>> finalPartitionMap = null; Topology allNodeTopo = new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(), clusterData.getClusterConfig()); - CardDealer cardDealer = getCardDealingAlgorithm(allNodeTopo); - + // Transform current assignment to instance->partitions map, and get total partitions + Map<Node, List<String>> nodeToPartitionMap = + convertPartitionMap(origPartitionMap, allNodeTopo); + // Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution. + CardDealingAdjustmentAlgorithmV2 cardDealer = getCardDealingAlgorithm(allNodeTopo); if (cardDealer.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) { // Round 3: Reorder preference Lists to ensure participants' orders (so as the states) are uniform. finalPartitionMap = shufflePreferenceList(nodeToPartitionMap); @@ -100,20 +112,22 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal new ConsistentHashingAdjustmentAlgorithm(allNodeTopo, liveNodes); if (hashPlacement.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) { // Since mapping is changed by hashPlacement, need to adjust nodes order. - Map<String, List<String>> adjustedPartitionMap = convertMap(nodeToPartitionMap); + Map<String, List<Node>> adjustedPartitionMap = + convertAssignment(nodeToPartitionMap); for (String partition : adjustedPartitionMap.keySet()) { - List<String> preSelectedList = finalPartitionMap.get(partition); - Set<String> adjustedNodeList = new HashSet<>(adjustedPartitionMap.get(partition)); - List<String> finalNodeList = adjustedPartitionMap.get(partition); + List<Node> preSelectedList = finalPartitionMap.get(partition); + Set<Node> adjustedNodeList = + new HashSet<>(adjustedPartitionMap.get(partition)); + List<Node> finalNodeList = adjustedPartitionMap.get(partition); int index = 0; // 1. Add the ones in pre-selected node list first, in order - for (String node : preSelectedList) { + for (Node node : preSelectedList) { if (adjustedNodeList.remove(node)) { finalNodeList.set(index++, node); } } // 2. Add the rest of nodes to the map - for (String node : adjustedNodeList) { + for (Node node : adjustedNodeList) { finalNodeList.set(index++, node); } } @@ -123,7 +137,8 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal finalPartitionMap = null; } } catch (ExecutionException e) { - _logger.error("Failed to perform consistent hashing partition assigner.", e); + LogUtil.logError(_logger, eventId, + "Failed to perform consistent hashing partition assigner.", e); finalPartitionMap = null; } } @@ -131,16 +146,31 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal if (null != finalPartitionMap) { ZNRecord result = new ZNRecord(_resourceName); - result.setListFields(finalPartitionMap); + Map<String, List<String>> resultPartitionMap = new HashMap<>(); + for (String partitionName : finalPartitionMap.keySet()) { + List<String> instanceNames = new ArrayList<>(); + for (Node node : finalPartitionMap.get(partitionName)) { + if (node instanceof InstanceNode) { + instanceNames.add(((InstanceNode) node).getInstanceName()); + } else { + LogUtil.logError(_logger, eventId, + String.format("Selected node is not associated with an instance: %s", node)); + } + } + resultPartitionMap.put(partitionName, instanceNames); + } + result.setListFields(resultPartitionMap); return result; } } // Force even is not possible, fallback to use default strategy if (_logger.isDebugEnabled()) { - _logger.debug("Force even distribution is not possible, using the default strategy: " - + getBaseRebalanceStrategy().getClass().getSimpleName()); + LogUtil.logDebug(_logger, eventId, + "Force even distribution is not possible, using the default strategy: " + + getBaseRebalanceStrategy().getClass().getSimpleName()); } + if (liveNodes.equals(allNodes)) { return origAssignment; } else { @@ -151,27 +181,28 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal } // Best effort to shuffle preference lists for all partitions for uniform distribution regarding the top state. - private Map<String, List<String>> shufflePreferenceList( - Map<String, List<String>> nodeToPartitionMap) { - final Map<String, List<String>> partitionMap = convertMap(nodeToPartitionMap); + private Map<String, List<Node>> shufflePreferenceList( + Map<Node, List<String>> nodeToPartitionMap) { + final Map<String, List<Node>> partitionMap = convertAssignment(nodeToPartitionMap); // evaluate node's order according to: // 1. their potential top state replicas count (less count, higher priority) // 2. their assigned top state replicas (less top state replica, higher priority) - final Map<String, Integer> nodeScores = new HashMap<>(); - for (String node : nodeToPartitionMap.keySet()) { + final Map<Node, Integer> nodeScores = new HashMap<>(); + for (Node node : nodeToPartitionMap.keySet()) { // Init with the potential replicas count nodeScores.put(node, nodeToPartitionMap.get(node).size()); } for (final String partition : partitionMap.keySet()) { - List<String> nodes = partitionMap.get(partition); + List<Node> nodes = partitionMap.get(partition); // order according to score - Collections.sort(nodes, new Comparator<String>() { + Collections.sort(nodes, new Comparator<Node>() { @Override - public int compare(String o1, String o2) { + public int compare(Node o1, Node o2) { int o1Score = nodeScores.get(o1); int o2Score = nodeScores.get(o2); if (o1Score == o2Score) { - return new Integer((partition + o1).hashCode()).compareTo((partition + o2).hashCode()); + return new Integer((partition + o1.getName()).hashCode()) + .compareTo((partition + o2.getName()).hashCode()); } else { return o1Score - o2Score; } @@ -179,23 +210,49 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal }); // After assignment, the nodes has less potential top states for (int i = 0; i < nodes.size(); i++) { - String nodeName = nodes.get(i); - nodeScores.put(nodeName, - nodeScores.get(nodeName) - 1 + (i == 0 ? (int) Math.pow(_replica, 2) : 0)); + Node node = nodes.get(i); + nodeScores.put(node, nodeScores.get(node) - 1 + (i == 0 ? (int) Math.pow(_replica, 2) : 0)); } } return partitionMap; } // Convert the map from <key, list of values> to a new map <original value, list of related keys> - private Map<String, List<String>> convertMap(Map<String, List<String>> originalMap) { - Map<String, List<String>> resultMap = new HashMap<>(); - for (String originalKey : originalMap.keySet()) { - for (String originalValue : originalMap.get(originalKey)) { - if (!resultMap.containsKey(originalValue)) { - resultMap.put(originalValue, new ArrayList<String>()); + private Map<String, List<Node>> convertAssignment( + Map<Node, List<String>> assignment) { + Map<String, List<Node>> resultMap = new HashMap<>(); + for (Node instance : assignment.keySet()) { + for (String partitionName : assignment.get(instance)) { + if (!resultMap.containsKey(partitionName)) { + resultMap.put(partitionName, new ArrayList<Node>()); + } + resultMap.get(partitionName).add(instance); + } + } + return resultMap; + } + + // Convert the map from <Partition Name, List<instance names>> to a new map <InstanceNode, List<Partition Name>> + private Map<Node, List<String>> convertPartitionMap(Map<String, List<String>> originalMap, + Topology topology) { + Map<Node, List<String>> resultMap = new HashMap<>(); + Map<String, Node> instanceMap = new HashMap<>(); + for (Node node : Topology.getAllLeafNodes(topology.getRootNode())) { + if (node instanceof InstanceNode) { + InstanceNode insNode = (InstanceNode) node; + instanceMap.put(insNode.getInstanceName(), insNode); + } + } + + for (String partition : originalMap.keySet()) { + for (String instanceName : originalMap.get(partition)) { + Node insNode = instanceMap.get(instanceName); + if (insNode != null) { + if (!resultMap.containsKey(insNode)) { + resultMap.put(insNode, new ArrayList<String>()); + } + resultMap.get(insNode).add(partition); } - resultMap.get(originalValue).add(originalKey); } } return resultMap; http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java index 91b8e6b..61b732e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java @@ -19,15 +19,23 @@ package org.apache.helix.controller.rebalancer.strategy; * under the License. */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint; import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint; import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider; import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.common.ResourcesStateMap; import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint; -import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealer; import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithmV2; import org.apache.helix.controller.rebalancer.topology.Topology; import org.apache.helix.controller.stages.ClusterDataCache; @@ -36,8 +44,6 @@ import org.apache.helix.model.Partition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - /** * Constraints based rebalance strategy. * Assignment is calculated according to the specified constraints. @@ -108,7 +114,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan _softConstraints.add(defaultConstraint); } - protected CardDealer getCardDealingAlgorithm(Topology topology) { + protected CardDealingAdjustmentAlgorithmV2 getCardDealingAlgorithm(Topology topology) { // For constraint based strategy, need more fine-grained assignment for each partition. // So evenness is more important. return new CardDealingAdjustmentAlgorithmV2(topology, _replica, @@ -156,7 +162,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan List<String> candidates = new ArrayList<>(allNodes); // Only calculate for configured nodes. // Remove all non-configured nodes. - candidates.retainAll(clusterData.getInstanceConfigMap().keySet()); + candidates.retainAll(clusterData.getAllInstances()); // For generating the IdealState ZNRecord Map<String, List<String>> preferenceList = new HashMap<>(); @@ -168,11 +174,11 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan // check for the preferred assignment partitionMapping = validateStateMap(partitionMapping); if (partitionMapping != null) { - _logger.debug( + LogUtil.logDebug(_logger, clusterData.getEventId(), "The provided preferred partition assignment meets state model requirements. Skip rebalance."); preferenceList.put(partition, new ArrayList<>(partitionMapping.keySet())); idealStateMap.put(partition, partitionMapping); - updateConstraints(partition, partitionMapping); + updateConstraints(partition, partitionMapping, clusterData.getEventId()); continue; } } // else, recalculate the assignment @@ -195,7 +201,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan idealStateMap.put(partition, stateMap); preferenceList.put(partition, assignment); // Note, only update with the new pending assignment - updateConstraints(partition, stateMap); + updateConstraints(partition, stateMap, clusterData.getEventId()); } // recover the original weight @@ -303,15 +309,18 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan return partitionAssignment.getListFields().get(partitionName); } - private void updateConstraints(String partition, Map<String, String> pendingAssignment) { + private void updateConstraints(String partition, Map<String, String> pendingAssignment, + String eventId) { if (pendingAssignment.isEmpty()) { - _logger.warn("No pending assignment needs to update. Skip constraint update."); + LogUtil.logWarn(_logger, eventId, + "No pending assignment needs to update. Skip constraint update."); return; } ResourcesStateMap tempStateMap = new ResourcesStateMap(); tempStateMap.setState(_resourceName, new Partition(partition), pendingAssignment); - _logger.debug("Update constraints with pending assignment: " + tempStateMap.toString()); + LogUtil.logDebug(_logger, eventId, + "Update constraints with pending assignment: " + tempStateMap.toString()); for (AbstractRebalanceHardConstraint hardConstraint : _hardConstraints) { hardConstraint.updateAssignment(tempStateMap); http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java index 0cb3188..939b810 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java @@ -19,27 +19,29 @@ package org.apache.helix.controller.rebalancer.strategy; * under the License. */ +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + import com.google.common.base.Predicate; import com.google.common.base.Predicates; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; -import org.apache.helix.util.JenkinsHash; +import org.apache.helix.controller.rebalancer.topology.InstanceNode; import org.apache.helix.controller.rebalancer.topology.Node; import org.apache.helix.controller.rebalancer.topology.Topology; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.util.JenkinsHash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * CRUSH-based partition mapping strategy. */ @@ -78,23 +80,32 @@ public class CrushRebalanceStrategy implements RebalanceStrategy { new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); Node topNode = _clusterTopo.getRootNode(); - Map<String, List<String>> newPreferences = new HashMap<String, List<String>>(); + // for log only + String eventId = clusterData.getEventId(); + + Map<String, List<String>> newPreferences = new HashMap<>(); for (int i = 0; i < _partitions.size(); i++) { String partitionName = _partitions.get(i); long data = partitionName.hashCode(); // apply the placement rules - List<Node> selected = select(topNode, data, _replicas); + List<Node> selected = select(topNode, data, _replicas, eventId); if (selected.size() < _replicas) { - Log.error(String + LogUtil.logError(Log, eventId, String .format("Can not find enough node for resource %s partition %s, required %d, find %d", _resourceName, partitionName, _replicas, selected.size())); } - List<String> nodeList = new ArrayList<String>(); + List<String> nodeList = new ArrayList<>(); for (int j = 0; j < selected.size(); j++) { - nodeList.add(selected.get(j).getName()); + Node selectedNode = selected.get(j); + if (selectedNode instanceof InstanceNode) { + nodeList.add(((InstanceNode) selectedNode).getInstanceName()); + } else { + LogUtil.logError(Log, eventId, + "Selected node is not associated with an instance: " + selectedNode.toString()); + } } newPreferences.put(partitionName, nodeList); @@ -118,10 +129,10 @@ public class CrushRebalanceStrategy implements RebalanceStrategy { * The caller will try to get the expected number of selected nodes as a result, * if no enough nodes can be found, could return any number of nodes than required. */ - private List<Node> select(Node topNode, long data, int rf) + private List<Node> select(Node topNode, long data, int rf, String eventId) throws HelixException { - List<Node> nodes = new ArrayList<Node>(rf); - Set<Node> selectedZones = new HashSet<Node>(); + List<Node> nodes = new ArrayList<>(rf); + Set<Node> selectedZones = new HashSet<>(); long input = data; int count = rf; int tries = 0; @@ -132,7 +143,8 @@ public class CrushRebalanceStrategy implements RebalanceStrategy { input = hashFun.hash(input); // create a different hash value for retrying tries++; if (tries >= MAX_RETRY) { - Log.error(String.format("Could not find all mappings after %d tries", tries)); + LogUtil.logError(Log, eventId, + String.format("Could not find all mappings after %d tries", tries)); break; } } http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java index f2750ca..2c15ab6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java @@ -28,17 +28,20 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; +import org.apache.helix.controller.rebalancer.topology.InstanceNode; import org.apache.helix.controller.rebalancer.topology.Node; import org.apache.helix.controller.rebalancer.topology.Topology; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.model.InstanceConfig; import org.apache.helix.util.JenkinsHash; - -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Multi-round CRUSH partition mapping strategy. @@ -46,13 +49,15 @@ import com.google.common.base.Predicates; * but number of partitions to be reshuffled during node outage could be higher than CrushRebalanceStrategy. */ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { + private static final Logger Log = + LoggerFactory.getLogger(MultiRoundCrushRebalanceStrategy.class.getName()); private String _resourceName; private List<String> _partitions; private Topology _clusterTopo; private int _replicas; private LinkedHashMap<String, Integer> _stateCountMap; - private final int MAX_ITERNATION = 3; + private final int MAX_ITERATION = 3; @Override public void init(String resourceName, final List<String> partitions, @@ -81,7 +86,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); Node root = _clusterTopo.getRootNode(); - Map<String, List<Node>> zoneMapping = new HashMap<String, List<Node>>(); + Map<String, List<Node>> zoneMapping = new HashMap<>(); for (int i = 0; i < _partitions.size(); i++) { String partitionName = _partitions.get(i); long pData = partitionName.hashCode(); @@ -92,7 +97,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { } /* map the position in preference list to the state */ - Map<Integer, String> idxStateMap = new HashMap<Integer, String>(); + Map<Integer, String> idxStateMap = new HashMap<>(); int i = 0; for (Map.Entry<String, Integer> e : _stateCountMap.entrySet()) { String state = e.getKey(); @@ -104,13 +109,11 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { } // Final mapping <partition, state> -> list(node) - Map<String, Map<String, List<Node>>> partitionStateMapping = - new HashMap<String, Map<String, List<Node>>>(); + Map<String, Map<String, List<Node>>> partitionStateMapping = new HashMap<>(); for (Node zone : _clusterTopo.getFaultZones()) { // partition state -> list(partitions) - LinkedHashMap<String, List<String>> statePartitionMap = - new LinkedHashMap<String, List<String>>(); + LinkedHashMap<String, List<String>> statePartitionMap = new LinkedHashMap<>(); // TODO: move this outside? for (Map.Entry<String, List<Node>> e : zoneMapping.entrySet()) { String partition = e.getKey(); @@ -145,21 +148,28 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { } } - return generateZNRecord(_resourceName, _partitions, partitionStateMapping); + return generateZNRecord(_resourceName, _partitions, partitionStateMapping, + clusterData.getEventId()); } private ZNRecord generateZNRecord(String resource, List<String> partitions, - Map<String, Map<String, List<Node>>> partitionStateMapping) { - Map<String, List<String>> newPreferences = new HashMap<String, List<String>>(); + Map<String, Map<String, List<Node>>> partitionStateMapping, String eventId) { + Map<String, List<String>> newPreferences = new HashMap<>(); for (int i = 0; i < partitions.size(); i++) { String partitionName = partitions.get(i); Map<String, List<Node>> stateNodeMap = partitionStateMapping.get(partitionName); for (String state : _stateCountMap.keySet()) { List<Node> nodes = stateNodeMap.get(state); - List<String> nodeList = new ArrayList<String>(); + List<String> nodeList = new ArrayList<>(); for (int j = 0; j < nodes.size(); j++) { - nodeList.add(nodes.get(j).getName()); + Node selectedNode = nodes.get(j); + if (selectedNode instanceof InstanceNode) { + nodeList.add(((InstanceNode) selectedNode).getInstanceName()); + } else { + LogUtil.logError(Log, eventId, + "Selected node is not associated with an instance: " + selectedNode.toString()); + } } if (!newPreferences.containsKey(partitionName)) { newPreferences.put(partitionName, new ArrayList<String>()); @@ -200,7 +210,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { int iteration = 0; Node root = zone; boolean noAssignmentFound = false; - while (iteration++ < MAX_ITERNATION && !noAssignmentFound) { + while (iteration++ < MAX_ITERATION && !noAssignmentFound) { copyAssignment(nodePartitionsMap, prevNodePartitionsMap); for (Map.Entry<Node, List<String>> e : toRemovedMap.entrySet()) { List<String> curAssignedPartitions = nodePartitionsMap.get(e.getKey()); @@ -255,11 +265,11 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { private Node recalculateWeight(Node zone, long totalWeight, int totalPartition, Map<Node, List<String>> nodePartitionsMap, List<String> partitions, Map<Node, List<String>> toRemovedMap) { - Map<String, Integer> newNodeWeight = new HashMap<>(); - Set<String> completedNodes = new HashSet<>(); + Map<Node, Integer> newNodeWeight = new HashMap<>(); + Set<Node> completedNodes = new HashSet<>(); for (Node node : Topology.getAllLeafNodes(zone)) { if (node.isFailed()) { - completedNodes.add(node.getName()); + completedNodes.add(node); continue; } long weight = node.getWeight(); @@ -280,9 +290,9 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { int missing = target - numPartitions; if (missing > 0) { - newNodeWeight.put(node.getName(), missing * 10); + newNodeWeight.put(node, missing * 10); } else { - completedNodes.add(node.getName()); + completedNodes.add(node); } } @@ -308,16 +318,16 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { */ private List<Node> select(Node topNode, String nodeType, long data, int rf) throws HelixException { - List<Node> zones = new ArrayList<Node>(); + List<Node> nodes = new ArrayList<>(); long input = data; int count = rf; int tries = 0; - while (zones.size() < rf) { + while (nodes.size() < rf) { List<Node> selected = placementAlgorithm - .select(topNode, input, rf, nodeType, nodeAlreadySelected(new HashSet<Node>(zones))); + .select(topNode, input, rf, nodeType, nodeAlreadySelected(new HashSet<>(nodes))); // add the racks to the selected racks - zones.addAll(selected); - count = rf - zones.size(); + nodes.addAll(selected); + count = rf - nodes.size(); if (count > 0) { input = hashFun.hash(input); // create a different hash value for retrying tries++; @@ -327,7 +337,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { } } } - return zones; + return nodes; } /** http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java deleted file mode 100644 index 2f76577..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.helix.controller.rebalancer.strategy.crushMapping; - -import java.util.List; -import java.util.Map; - -public interface CardDealer { - boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed); -} http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java deleted file mode 100644 index 3e523e9..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java +++ /dev/null @@ -1,212 +0,0 @@ -package org.apache.helix.controller.rebalancer.strategy.crushMapping; - -import org.apache.helix.controller.rebalancer.topology.Node; -import org.apache.helix.controller.rebalancer.topology.Topology; - -import java.util.*; - -public class CardDealingAdjustmentAlgorithm implements CardDealer { - private static int MAX_ADJUSTMENT = 2; - - private int _replica; - // Instance -> FaultZone Tag - private Map<String, String> _instanceFaultZone = new HashMap<>(); - private Map<String, Long> _instanceWeight = new HashMap<>(); - private long _totalWeight = 0; - private Map<String, Long> _faultZoneWeight = new HashMap<>(); - // Record existing partitions that are assigned to a fault zone - private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>(); - - public CardDealingAdjustmentAlgorithm(Topology topology, int replica) { - _replica = replica; - // Get all instance related information. - for (Node zone : topology.getFaultZones()) { - _faultZoneWeight.put(zone.getName(), zone.getWeight()); - if (!_faultZonePartitionMap.containsKey(zone.getName())) { - _faultZonePartitionMap.put(zone.getName(), new HashSet<String>()); - } - for (Node instance : Topology.getAllLeafNodes(zone)) { - if (!instance.isFailed()) { - _instanceWeight.put(instance.getName(), instance.getWeight()); - _totalWeight += instance.getWeight(); - _instanceFaultZone.put(instance.getName(), zone.getName()); - } - } - } - } - - public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed) { - // Records exceed partitions - TreeMap<String, Integer> toBeReassigned = new TreeMap<>(); - - // Calculate total partitions that need to be calculated - long totalReplicaCount = 0; - for (List<String> partitions : nodeToPartitionMap.values()) { - totalReplicaCount += partitions.size(); - } - if (totalReplicaCount == 0 || _replica > _faultZoneWeight.size()) { - return false; - } - - // instance -> target (ideal) partition count - Map<String, Float> targetPartitionCount = new HashMap<>(); - for (String liveInstance : _instanceFaultZone.keySet()) { - long zoneWeight = _faultZoneWeight.get(_instanceFaultZone.get(liveInstance)); - float instanceRatioInZone = ((float) _instanceWeight.get(liveInstance)) / zoneWeight; - // 1. if replica = fault zone, fault zone weight does not count, so calculate according to fault zone count. - // 2. else, should consider fault zone weight to calculate expected threshold. - float zonePartitions; - if (_replica == _faultZoneWeight.size()) { - zonePartitions = ((float) totalReplicaCount) / _faultZoneWeight.size(); - } else { - zonePartitions = ((float) totalReplicaCount) * zoneWeight / _totalWeight; - } - targetPartitionCount.put(liveInstance, instanceRatioInZone * zonePartitions); - } - - // Calculate the expected spikes - // Assign spikes to each zone according to zone weight - int totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size(); - Map<String, Integer> maxZoneOverflows = new HashMap<>(); - for (String faultZoneName : _faultZoneWeight.keySet()) { - float zoneWeight = _faultZoneWeight.get(faultZoneName); - maxZoneOverflows.put(faultZoneName, - (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight)); - } - - Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator(); - while (nodeIter.hasNext()) { - String instance = nodeIter.next(); - // Cleanup the existing mapping. Remove all non-active nodes from the mapping - if (!_instanceFaultZone.containsKey(instance)) { - List<String> partitions = nodeToPartitionMap.get(instance); - addToReAssignPartition(toBeReassigned, partitions); - partitions.clear(); - nodeIter.remove(); - } - } - - List<String> orderedInstances = new ArrayList<>(_instanceFaultZone.keySet()); - // Different resource should shuffle nodes in different ways. - Collections.shuffle(orderedInstances, new Random(randomSeed)); - for (String instance : orderedInstances) { - if (!nodeToPartitionMap.containsKey(instance)) { - continue; - } - // Cut off the exceed partitions compared with target partition count. - List<String> partitions = nodeToPartitionMap.get(instance); - int target = (int) (Math.floor(targetPartitionCount.get(instance))); - if (partitions.size() > target) { - int maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance)); - if (maxZoneOverflow > 0 && totalOverflows > 0) { - // When fault zone has overflow capacity AND there are still remaining overflow partitions - target = (int) (Math.ceil(targetPartitionCount.get(instance))); - maxZoneOverflows.put(_instanceFaultZone.get(instance), maxZoneOverflow - 1); - totalOverflows--; - } - - // Shuffle partitions to randomly pickup exceed ones. Ensure the algorithm generates consistent results when the inputs are the same. - Collections.shuffle(partitions, new Random(instance.hashCode() * 31 + randomSeed)); - addToReAssignPartition(toBeReassigned, partitions.subList(target, partitions.size())); - - // Put the remaining partitions to the assignment, and record in fault zone partition list - List<String> remainingPartitions = new ArrayList<>(partitions.subList(0, target)); - partitions.clear(); - nodeToPartitionMap.put(instance, remainingPartitions); - } - _faultZonePartitionMap.get(_instanceFaultZone.get(instance)) - .addAll(nodeToPartitionMap.get(instance)); - } - - // Reassign if any instances have space left. - // Assign partition according to the target capacity, CAP at "Math.floor(target) + adjustment" - int adjustment = 0; - while (!toBeReassigned.isEmpty() && adjustment <= MAX_ADJUSTMENT) { - partitionDealing(_instanceFaultZone.keySet(), toBeReassigned, _faultZonePartitionMap, - _instanceFaultZone, nodeToPartitionMap, targetPartitionCount, randomSeed, adjustment++); - } - return toBeReassigned.isEmpty(); - } - - private void partitionDealing(Collection<String> instances, - TreeMap<String, Integer> toBeReassigned, Map<String, Set<String>> faultZonePartitionMap, - Map<String, String> faultZoneMap, final Map<String, List<String>> assignmentMap, - Map<String, Float> targetPartitionCount, final int randomSeed, int targetAdjustment) { - PriorityQueue<String> instanceQueue = - new PriorityQueue<>(instances.size(), new Comparator<String>() { - @Override - public int compare(String node1, String node2) { - int node1Load = assignmentMap.containsKey(node1) ? assignmentMap.get(node1).size() : 0; - int node2Load = assignmentMap.containsKey(node2) ? assignmentMap.get(node2).size() : 0; - if (node1Load == node2Load) { - return new Integer((node1 + randomSeed).hashCode()) - .compareTo((node2 + randomSeed).hashCode()); - } else { - return node1Load - node2Load; - } - } - }); - instanceQueue.addAll(instances); - - while (!toBeReassigned.isEmpty()) { - boolean anyPartitionAssigned = false; - Iterator<String> instanceIter = instanceQueue.iterator(); - while (instanceIter.hasNext()) { - String instance = instanceIter.next(); - // Temporary remove the node from queue. - // If any partition assigned to the instance, add it back to reset priority. - instanceIter.remove(); - boolean partitionAssignedToInstance = false; - String faultZoneStr = faultZoneMap.get(instance); - List<String> partitions = assignmentMap.containsKey(instance) ? - assignmentMap.get(instance) : - new ArrayList<String>(); - int space = - (int) (Math.floor(targetPartitionCount.get(instance))) + targetAdjustment - partitions - .size(); - if (space > 0) { - // Find a pending partition to locate - for (String pendingPartition : toBeReassigned.navigableKeySet()) { - if (!faultZonePartitionMap.get(faultZoneStr).contains(pendingPartition)) { - if (!assignmentMap.containsKey(instance)) { - assignmentMap.put(instance, partitions); - } - partitions.add(pendingPartition); - faultZonePartitionMap.get(faultZoneStr).add(pendingPartition); - if (toBeReassigned.get(pendingPartition) == 1) { - toBeReassigned.remove(pendingPartition); - } else { - toBeReassigned.put(pendingPartition, toBeReassigned.get(pendingPartition) - 1); - } - // if any assignment is made: - // this instance can hold more partitions in the future - partitionAssignedToInstance = true; - break; - } - } - } - if (partitionAssignedToInstance) { - // Reset priority in the queue - instanceQueue.add(instance); - anyPartitionAssigned = true; - break; - } - } - if (!anyPartitionAssigned) { - // if no pending partition is assigned to any instances in this loop, new assignment is not possible - break; - } - } - } - - private void addToReAssignPartition(TreeMap<String, Integer> toBeReassigned, - List<String> partitions) { - for (String partition : partitions) { - if (!toBeReassigned.containsKey(partition)) { - toBeReassigned.put(partition, 1); - } else { - toBeReassigned.put(partition, toBeReassigned.get(partition) + 1); - } - } - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java index 430e0a8..82370a5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java @@ -1,11 +1,24 @@ package org.apache.helix.controller.rebalancer.strategy.crushMapping; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.helix.controller.rebalancer.topology.InstanceNode; import org.apache.helix.controller.rebalancer.topology.Node; import org.apache.helix.controller.rebalancer.topology.Topology; -import java.util.*; - -public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { +public class CardDealingAdjustmentAlgorithmV2 { private static int MAX_ADJUSTMENT = 2; public enum Mode { @@ -14,35 +27,35 @@ public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { } private Mode _mode; - private int _replica; + protected int _replica; // Instance -> FaultZone Tag - private Map<String, String> _instanceFaultZone = new HashMap<>(); - private Map<String, Long> _instanceWeight = new HashMap<>(); - private long _totalWeight = 0; - private Map<String, Long> _faultZoneWeight = new HashMap<>(); + protected Map<Node, Node> _instanceFaultZone = new HashMap<>(); + protected Map<Node, Long> _instanceWeight = new HashMap<>(); + protected long _totalWeight = 0; + protected Map<Node, Long> _faultZoneWeight = new HashMap<>(); // Record existing partitions that are assigned to a fault zone - private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>(); + protected Map<Node, Set<String>> _faultZonePartitionMap = new HashMap<>(); public CardDealingAdjustmentAlgorithmV2(Topology topology, int replica, Mode mode) { _mode = mode; _replica = replica; // Get all instance related information. for (Node zone : topology.getFaultZones()) { - _faultZoneWeight.put(zone.getName(), zone.getWeight()); - if (!_faultZonePartitionMap.containsKey(zone.getName())) { - _faultZonePartitionMap.put(zone.getName(), new HashSet<String>()); + _faultZoneWeight.put(zone, zone.getWeight()); + if (!_faultZonePartitionMap.containsKey(zone)) { + _faultZonePartitionMap.put(zone, new HashSet<String>()); } for (Node instance : Topology.getAllLeafNodes(zone)) { - if (!instance.isFailed()) { - _instanceWeight.put(instance.getName(), instance.getWeight()); + if (instance instanceof InstanceNode && !instance.isFailed()) { + _instanceWeight.put(instance, instance.getWeight()); _totalWeight += instance.getWeight(); - _instanceFaultZone.put(instance.getName(), zone.getName()); + _instanceFaultZone.put(instance, zone); } } } } - public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed) { + public boolean computeMapping(Map<Node, List<String>> nodeToPartitionMap, int randomSeed) { // Records exceed partitions TreeMap<String, Integer> toBeReassigned = new TreeMap<>(); @@ -56,8 +69,8 @@ public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { } // instance -> target (ideal) partition count - Map<String, Float> targetPartitionCount = new HashMap<>(); - for (String liveInstance : _instanceFaultZone.keySet()) { + Map<Node, Float> targetPartitionCount = new HashMap<>(); + for (Node liveInstance : _instanceFaultZone.keySet()) { long zoneWeight = _faultZoneWeight.get(_instanceFaultZone.get(liveInstance)); float instanceRatioInZone = ((float) _instanceWeight.get(liveInstance)) / zoneWeight; // 1. if replica = fault zone, fault zone weight does not count, so calculate according to fault zone count. @@ -72,22 +85,22 @@ public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { } int totalOverflows = 0; - Map<String, Integer> maxZoneOverflows = new HashMap<>(); + Map<Node, Integer> maxZoneOverflows = new HashMap<>(); if (_mode.equals(Mode.MINIMIZE_MOVEMENT)) { + // Note that keep the spikes if possible will hurt evenness. So only do this for MINIMIZE_MOVEMENT mode + // Calculate the expected spikes // Assign spikes to each zone according to zone weight totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size(); - for (String faultZoneName : _faultZoneWeight.keySet()) { - float zoneWeight = _faultZoneWeight.get(faultZoneName); - maxZoneOverflows.put(faultZoneName, + for (Node faultZone : _faultZoneWeight.keySet()) { + float zoneWeight = _faultZoneWeight.get(faultZone); + maxZoneOverflows.put(faultZone, (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight)); } } - // Note that keep the spikes if possible will hurt evenness. So only do this for MINIMIZE_MOVEMENT mode - - Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator(); + Iterator<Node> nodeIter = nodeToPartitionMap.keySet().iterator(); while (nodeIter.hasNext()) { - String instance = nodeIter.next(); + Node instance = nodeIter.next(); // Cleanup the existing mapping. Remove all non-active nodes from the mapping if (!_instanceFaultZone.containsKey(instance)) { List<String> partitions = nodeToPartitionMap.get(instance); @@ -97,10 +110,10 @@ public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { } } - List<String> orderedInstances = new ArrayList<>(_instanceFaultZone.keySet()); + List<Node> orderedInstances = new ArrayList<>(_instanceFaultZone.keySet()); // Different resource should shuffle nodes in different ways. Collections.shuffle(orderedInstances, new Random(randomSeed)); - for (String instance : orderedInstances) { + for (Node instance : orderedInstances) { if (!nodeToPartitionMap.containsKey(instance)) { continue; } @@ -139,24 +152,27 @@ public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { return toBeReassigned.isEmpty(); } - private void partitionDealing(Collection<String> instances, - TreeMap<String, Integer> toBeReassigned, Map<String, Set<String>> faultZonePartitionMap, - Map<String, String> faultZoneMap, final Map<String, List<String>> assignmentMap, - final Map<String, Float> targetPartitionCount, final int randomSeed, final int targetAdjustment) { - PriorityQueue<String> instanceQueue = - new PriorityQueue<>(instances.size(), new Comparator<String>() { + private void partitionDealing(Collection<Node> instances, + TreeMap<String, Integer> toBeReassigned, Map<Node, Set<String>> faultZonePartitionMap, + Map<Node, Node> faultZoneMap, final Map<Node, List<String>> assignmentMap, + final Map<Node, Float> targetPartitionCount, final int randomSeed, int targetAdjustment) { + PriorityQueue<Node> instanceQueue = + new PriorityQueue<>(instances.size(), new Comparator<Node>() { @Override - public int compare(String node1, String node2) { + public int compare(Node node1, Node node2) { int node1Load = assignmentMap.containsKey(node1) ? assignmentMap.get(node1).size() : 0; int node2Load = assignmentMap.containsKey(node2) ? assignmentMap.get(node2).size() : 0; if (node1Load == node2Load) { - Float node1Target = targetPartitionCount.get(node1); - Float node2Target = targetPartitionCount.get(node2); - if (node1Target == node2Target) { - return new Integer((node1 + randomSeed).hashCode()).compareTo((node2 + randomSeed).hashCode()); - } else { - return node2Target.compareTo(node1Target); + if (_mode.equals(Mode.EVENNESS)) { + // Also consider node target load if mode is evenness + Float node1Target = targetPartitionCount.get(node1); + Float node2Target = targetPartitionCount.get(node2); + if (node1Target != node2Target) { + return node2Target.compareTo(node1Target); + } } + return new Integer((node1.getName() + randomSeed).hashCode()) + .compareTo((node2.getName() + randomSeed).hashCode()); } else { return node1Load - node2Load; } @@ -166,14 +182,14 @@ public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { while (!toBeReassigned.isEmpty()) { boolean anyPartitionAssigned = false; - Iterator<String> instanceIter = instanceQueue.iterator(); + Iterator<Node> instanceIter = instanceQueue.iterator(); while (instanceIter.hasNext()) { - String instance = instanceIter.next(); + Node instance = instanceIter.next(); // Temporary remove the node from queue. // If any partition assigned to the instance, add it back to reset priority. instanceIter.remove(); boolean partitionAssignedToInstance = false; - String faultZoneStr = faultZoneMap.get(instance); + Node faultZone = faultZoneMap.get(instance); List<String> partitions = assignmentMap.containsKey(instance) ? assignmentMap.get(instance) : new ArrayList<String>(); @@ -183,12 +199,12 @@ public class CardDealingAdjustmentAlgorithmV2 implements CardDealer { if (space > 0) { // Find a pending partition to locate for (String pendingPartition : toBeReassigned.navigableKeySet()) { - if (!faultZonePartitionMap.get(faultZoneStr).contains(pendingPartition)) { + if (!faultZonePartitionMap.get(faultZone).contains(pendingPartition)) { if (!assignmentMap.containsKey(instance)) { assignmentMap.put(instance, partitions); } partitions.add(pendingPartition); - faultZonePartitionMap.get(faultZoneStr).add(pendingPartition); + faultZonePartitionMap.get(faultZone).add(pendingPartition); if (toBeReassigned.get(pendingPartition) == 1) { toBeReassigned.remove(pendingPartition); } else { http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java index c7ff844..4aadb71 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java @@ -1,68 +1,82 @@ package org.apache.helix.controller.rebalancer.strategy.crushMapping; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.apache.helix.controller.rebalancer.topology.InstanceNode; import org.apache.helix.controller.rebalancer.topology.Node; import org.apache.helix.controller.rebalancer.topology.Topology; import org.apache.helix.util.JenkinsHash; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - public class ConsistentHashingAdjustmentAlgorithm { private static final int MAX_SELETOR_CACHE_SIZE = 1000; private static final int SELETOR_CACHE_EXPIRE = 3; private JenkinsHash _hashFunction; private ConsistentHashSelector _selector; - Set<String> _activeInstances = new HashSet<>(); + Set<Node> _activeInstances = new HashSet<>(); // Instance -> FaultZone Tag - private Map<String, String> _faultZoneMap = new HashMap<>(); + private Map<Node, Node> _faultZoneMap = new HashMap<>(); // Record existing partitions that are assigned to a fault zone - private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>(); + private Map<Node, Set<String>> _faultZonePartitionMap = new HashMap<>(); // Cache records all known topology. - private final static LoadingCache<Set<String>, ConsistentHashSelector> _selectorCache = + private final static LoadingCache<Set<Node>, ConsistentHashSelector> _selectorCache = CacheBuilder.newBuilder().maximumSize(MAX_SELETOR_CACHE_SIZE) .expireAfterAccess(SELETOR_CACHE_EXPIRE, TimeUnit.MINUTES) - .build(new CacheLoader<Set<String>, ConsistentHashSelector>() { - public ConsistentHashSelector load(Set<String> allInstances) { + .build(new CacheLoader<Set<Node>, ConsistentHashSelector>() { + public ConsistentHashSelector load(Set<Node> allInstances) { return new ConsistentHashSelector(allInstances); } }); - public ConsistentHashingAdjustmentAlgorithm(Topology topology, Collection<String> activeInstances) - throws ExecutionException { + public ConsistentHashingAdjustmentAlgorithm(Topology topology, + Collection<String> activeInstanceNames) throws ExecutionException { _hashFunction = new JenkinsHash(); - Set<String> allInstances = new HashSet<>(); + Set<Node> allInstances = new HashSet<>(); // Get all instance related information. for (Node zone : topology.getFaultZones()) { for (Node instance : Topology.getAllLeafNodes(zone)) { - allInstances.add(instance.getName()); - _faultZoneMap.put(instance.getName(), zone.getName()); - if (!_faultZonePartitionMap.containsKey(zone.getName())) { - _faultZonePartitionMap.put(zone.getName(), new HashSet<String>()); + if (instance instanceof InstanceNode) { + allInstances.add(instance); + _faultZoneMap.put(instance, zone); + if (!_faultZonePartitionMap.containsKey(zone)) { + _faultZonePartitionMap.put(zone, new HashSet<String>()); + } + if (activeInstanceNames.contains(((InstanceNode) instance).getInstanceName())) { + _activeInstances.add(instance); + } } } } _selector = _selectorCache.get(allInstances); - _activeInstances.addAll(activeInstances); } - public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed) { + public boolean computeMapping(Map<Node, List<String>> nodeToPartitionMap, int randomSeed) { if (_activeInstances.isEmpty()) { return false; } - Set<String> inactiveInstances = new HashSet<>(); + Set<Node> inactiveInstances = new HashSet<>(); Map<String, Integer> toBeReassigned = new HashMap<>(); // Remove all partition assignment to a non-live instance - Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator(); + Iterator<Node> nodeIter = nodeToPartitionMap.keySet().iterator(); while (nodeIter.hasNext()) { - String instance = nodeIter.next(); + Node instance = nodeIter.next(); List<String> partitions = nodeToPartitionMap.get(instance); if (!_activeInstances.contains(instance)) { inactiveInstances.add(instance); @@ -76,14 +90,14 @@ public class ConsistentHashingAdjustmentAlgorithm { for (String partition : new ArrayList<>(toBeReassigned.keySet())) { int remainReplicas = toBeReassigned.get(partition); - Set<String> conflictInstance = new HashSet<>(); + Set<Node> conflictInstance = new HashSet<>(); for (int index = 0; index < toBeReassigned.get(partition); index++) { - Iterable<String> sortedInstances = + Iterable<Node> sortedInstances = _selector.getCircle(_hashFunction.hash(randomSeed, partition.hashCode(), index)); - Iterator<String> instanceItr = sortedInstances.iterator(); + Iterator<Node> instanceItr = sortedInstances.iterator(); while (instanceItr.hasNext() && conflictInstance.size() + inactiveInstances.size() != _selector.instanceSize) { - String instance = instanceItr.next(); + Node instance = instanceItr.next(); if (!_activeInstances.contains(instance)) { inactiveInstances.add(instance); } @@ -130,24 +144,24 @@ public class ConsistentHashingAdjustmentAlgorithm { class ConsistentHashSelector { private final static int DEFAULT_TOKENS_PER_INSTANCE = 1000; private final static JenkinsHash _hashFunction = new JenkinsHash(); - private final SortedMap<Long, String> circle = new TreeMap<>(); + private final SortedMap<Long, Node> circle = new TreeMap<>(); protected int instanceSize = 0; - public ConsistentHashSelector(Set<String> instances) { - for (String instance : instances) { + public ConsistentHashSelector(Set<Node> instances) { + for (Node instance : instances) { add(instance, DEFAULT_TOKENS_PER_INSTANCE); instanceSize++; } } - private void add(String instance, long numberOfReplicas) { + private void add(Node instance, long numberOfReplicas) { int instanceHashCode = instance.hashCode(); for (int i = 0; i < numberOfReplicas; i++) { circle.put(_hashFunction.hash(instanceHashCode, i), instance); } } - public Iterable<String> getCircle(long data) { + public Iterable<Node> getCircle(long data) { if (circle.isEmpty()) { return null; } http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/InstanceNode.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/InstanceNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/InstanceNode.java new file mode 100644 index 0000000..82bfc99 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/InstanceNode.java @@ -0,0 +1,63 @@ +package org.apache.helix.controller.rebalancer.topology; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class InstanceNode extends Node { + private String _instanceName; + + public InstanceNode(Node node, String instanceName) { + super(node); + _instanceName = instanceName; + } + + public InstanceNode clone() { + return new InstanceNode(this, _instanceName); + } + + public String getInstanceName() { + return _instanceName; + } + + public void setInstanceName(String instanceName) { + _instanceName = instanceName; + } + + @Override + public String toString() { + return super.toString() + ":" + _instanceName; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof InstanceNode)) { + return false; + } + InstanceNode that = (InstanceNode)obj; + return super.equals(that) && _instanceName.equals(that.getInstanceName()); + } + + /** + * Do not override compareTo & hashCode. This are used to determine node position in the topology. + * Instance name should not affect the topology location in anyway. + */ +} http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java index e26bc60..7005430 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java @@ -20,10 +20,8 @@ package org.apache.helix.controller.rebalancer.topology; */ import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; public class Node implements Comparable<Node> { private String _name; @@ -31,16 +29,14 @@ public class Node implements Comparable<Node> { private long _id; private long _weight; - private LinkedHashMap<String, Node> _children = new LinkedHashMap<String, Node>(); + private LinkedHashMap<String, Node> _children = new LinkedHashMap<>(); private Node _parent; private boolean _failed; - public Node() { + public Node() { } - } - - public Node(Node node) { + protected Node(Node node) { _name = node.getName(); _type = node.getType(); _id = node.getId(); @@ -48,6 +44,10 @@ public class Node implements Comparable<Node> { _failed = node.isFailed(); } + public Node clone() { + return new Node(this); + } + public String getName() { return _name; } http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index bf00e0e..f5b6141 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -30,8 +30,6 @@ import java.util.Map; import java.util.Set; import org.apache.helix.HelixException; -import org.apache.helix.HelixProperty; -import org.apache.helix.NotificationContext; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.slf4j.Logger; @@ -52,20 +50,21 @@ public class Topology { private static final int DEFAULT_NODE_WEIGHT = 1000; private final MessageDigest _md; - private Node _root; // root of the tree structure of all nodes; - private List<String> _allInstances; - private List<String> _liveInstances; - private Map<String, InstanceConfig> _instanceConfigMap; - private ClusterConfig _clusterConfig; + private final Node _root; // root of the tree structure of all nodes; + private final List<String> _allInstances; + private final List<String> _liveInstances; + private final Map<String, InstanceConfig> _instanceConfigMap; + private final ClusterConfig _clusterConfig; + private final boolean _topologyAwareEnabled; + private String _faultZoneType; private String _endNodeType; private boolean _useDefaultTopologyDef; - private boolean _topologyAwareEnabled; private LinkedHashSet<String> _types; /* default names for domain paths, if value is not specified for a domain path, the default one is used */ // TODO: default values can be defined in clusterConfig. - private Map<String, String> _defaultDomainPathValues = new HashMap<String, String>(); + private Map<String, String> _defaultDomainPathValues = new HashMap<>(); public Topology(final List<String> allNodes, final List<String> liveNodes, final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) { @@ -78,8 +77,13 @@ public class Topology { _allInstances = allNodes; _liveInstances = liveNodes; _instanceConfigMap = instanceConfigMap; + if (_instanceConfigMap == null || !_instanceConfigMap.keySet().containsAll(allNodes)) { + throw new HelixException(String.format("Config for instances %s is not found!", + _allInstances.removeAll(_instanceConfigMap.keySet()))); + } + _clusterConfig = clusterConfig; - _types = new LinkedHashSet<String>(); + _types = new LinkedHashSet<>(); _topologyAwareEnabled = clusterConfig.isTopologyAwareEnabled(); if (_topologyAwareEnabled) { @@ -160,7 +164,7 @@ public class Topology { * @return */ public static List<Node> getAllLeafNodes(Node root) { - List<Node> nodes = new ArrayList<Node>(); + List<Node> nodes = new ArrayList<>(); if (root.isLeaf()) { nodes.add(root); } else { @@ -180,18 +184,18 @@ public class Topology { * @param failedNodes set of nodes that need to be failed. * @return new root node. */ - public static Node clone(Node root, Map<String, Integer> newNodeWeight, Set<String> failedNodes) { + public static Node clone(Node root, Map<Node, Integer> newNodeWeight, Set<Node> failedNodes) { Node newRoot = cloneTree(root, newNodeWeight, failedNodes); computeWeight(newRoot); return newRoot; } - private static Node cloneTree(Node root, Map<String, Integer> newNodeWeight, Set<String> failedNodes) { - Node newRoot = new Node(root); - if (newNodeWeight.containsKey(root.getName())) { - newRoot.setWeight(newNodeWeight.get(root.getName())); + private static Node cloneTree(Node root, Map<Node, Integer> newNodeWeight, Set<Node> failedNodes) { + Node newRoot = root.clone(); + if (newNodeWeight.containsKey(root)) { + newRoot.setWeight(newNodeWeight.get(root)); } - if (failedNodes.contains(root.getName())) { + if (failedNodes.contains(root)) { newRoot.setFailed(true); newRoot.setWeight(0); } @@ -221,9 +225,6 @@ public class Topology { for (String ins : _allInstances) { InstanceConfig config = _instanceConfigMap.get(ins); - if (config == null) { - throw new HelixException(String.format("Config for instance %s is not found!", ins)); - } Map<String, String> pathValueMap = new HashMap<>(); if (_topologyAwareEnabled) { String zone = config.getZoneId(); @@ -253,7 +254,6 @@ public class Topology { } root = addEndNode(root, ins, pathValueMap, weight, _liveInstances); } - return root; } @@ -270,9 +270,6 @@ public class Topology { for (String ins : _allInstances) { InstanceConfig insConfig = _instanceConfigMap.get(ins); - if (insConfig == null) { - throw new HelixException(String.format("Config for instance %s is not found!", ins)); - } String domain = insConfig.getDomain(); if (domain == null) { if (insConfig.getInstanceEnabled() && (_clusterConfig.getDisabledInstances() == null @@ -290,7 +287,7 @@ public class Topology { } String[] pathPairs = domain.trim().split(","); - Map<String, String> pathValueMap = new HashMap<String, String>(); + Map<String, String> pathValueMap = new HashMap<>(); for (String pair : pathPairs) { String[] values = pair.trim().split("="); if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) { @@ -314,10 +311,8 @@ public class Topology { if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { weight = DEFAULT_NODE_WEIGHT; } - root = addEndNode(root, ins, pathValueMap, weight, _liveInstances); } - return root; } @@ -328,7 +323,7 @@ public class Topology { private Node addEndNode(Node root, String instanceName, Map<String, String> pathNameMap, int instanceWeight, List<String> liveInstances) { Node current = root; - List<Node> pathNodes = new ArrayList<Node>(); + List<Node> pathNodes = new ArrayList<>(); for (String path : _types) { String pathValue = pathNameMap.get(path); if (pathValue == null || pathValue.isEmpty()) { @@ -336,33 +331,44 @@ public class Topology { } pathNodes.add(current); if (!current.hasChild(pathValue)) { - Node n = new Node(); - n.setName(pathValue); - n.setId(computeId(pathValue)); - n.setType(path); - n.setParent(current); - - // if it is leaf node. - if (path.equals(_endNodeType)) { - if (liveInstances.contains(instanceName)) { - // node is alive - n.setWeight(instanceWeight); - // add instance weight to all of its parent nodes. - for (Node node : pathNodes) { - node.addWeight(instanceWeight); - } - } else { - n.setFailed(true); - n.setWeight(0); - } - } - current.addChild(n); + buildNewNode(pathValue, path, current, instanceName, instanceWeight, + liveInstances.contains(instanceName), pathNodes); + } else if (path.equals(_endNodeType)) { + throw new HelixException( + "Failed to add topology node because duplicate leaf nodes are not allowed. Duplicate node name: " + + pathValue); } current = current.getChild(pathValue); } return root; } + private Node buildNewNode(String name, String type, Node parent, String instanceName, + int instanceWeight, boolean isLiveInstance, List<Node> pathNodes) { + Node n = new Node(); + n.setName(name); + n.setId(computeId(name)); + n.setType(type); + n.setParent(parent); + // if it is leaf node, create an InstanceNode instead + if (type.equals(_endNodeType)) { + n = new InstanceNode(n, instanceName); + if (isLiveInstance) { + // node is alive + n.setWeight(instanceWeight); + // add instance weight to all of its parent nodes. + for (Node node : pathNodes) { + node.addWeight(instanceWeight); + } + } else { + n.setFailed(true); + n.setWeight(0); + } + } + parent.addChild(n); + return n; + } + private long computeId(String name) { byte[] h = _md.digest(name.getBytes()); return bstrTo32bit(h); http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 9311ca2..e90e63f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -419,7 +419,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public Set<String> getEnabledInstances() { - Set<String> enabledNodes = new HashSet<>(getInstanceConfigMap().keySet()); + Set<String> enabledNodes = getAllInstances(); enabledNodes.removeAll(getDisabledInstances()); return enabledNodes; http://git-wip-us.apache.org/repos/asf/helix/blob/d5bf3ad4/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java new file mode 100644 index 0000000..7275481 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java @@ -0,0 +1,213 @@ +package org.apache.helix.integration.rebalancer.CrushRebalancers; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Partition; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class TestNodeSwap extends ZkTestBase { + final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + List<MockParticipantManager> _participants = new ArrayList<>(); + Set<String> _allDBs = new HashSet<>(); + int _replica = 3; + + String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopology("/zone/instance"); + clusterConfig.setFaultZoneType("zone"); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + Set<String> nodes = new HashSet<>(); + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + i % 3; + String domain = String.format("zone=%s,instance=%s", zone, storageNodeName); + + InstanceConfig instanceConfig = + configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName); + instanceConfig.setDomain(domain); + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig); + nodes.add(storageNodeName); + } + + // start dummy participants + for (String node : nodes) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (MockParticipantManager p : _participants) { + if (p.isConnected()) { + p.syncStop(); + } + } + deleteCluster(CLUSTER_NAME); + } + + @DataProvider(name = "rebalanceStrategies") + public static Object [][] rebalanceStrategies() { + return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}, + {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()}, + {"CrushEdRebalanceStrategy", CrushEdRebalanceStrategy.class.getName()} + }; + } + + + @Test(dataProvider = "rebalanceStrategies") + public void testNodeSwap(String rebalanceStrategyName, + String rebalanceStrategyClass) throws Exception { + System.out.println("Test testNodeSwap for " + rebalanceStrategyName); + + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + IdealState.RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + HelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify(5000)); + + Map<String, ExternalView> record = new HashMap<>(); + + for (String db : _allDBs) { + record.put(db, + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db)); + } + + // swap a node and rebalance for new distribution + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + + // 1. disable and remove an old node + MockParticipantManager oldParticipant = _participants.get(0); + String oldParticipantName = oldParticipant.getInstanceName(); + oldParticipant.syncStop(); + InstanceConfig instanceConfig = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, oldParticipantName); + // disable the node first + instanceConfig.setInstanceEnabled(false); + _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, oldParticipantName, instanceConfig); + Assert.assertTrue(_clusterVerifier.verify(10000)); + // then remove it from topology + _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig); + + // 2. create new participant with same topology + String newParticipantName = "RandomParticipant-" + rebalanceStrategyName + "_" + START_PORT; + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newParticipantName); + InstanceConfig newConfig = + configAccessor.getInstanceConfig(CLUSTER_NAME, newParticipantName); + newConfig.setDomain(instanceConfig.getDomain()); + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, newParticipantName, newConfig); + + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newParticipantName); + participant.syncStart(); + _participants.add(0, participant); + Thread.sleep(300); + + Assert.assertTrue(_clusterVerifier.verify(5000)); + + for (String db : _allDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + ExternalView oldEv = record.get(db); + for(String partition : ev.getPartitionSet()) { + Map<String, String> stateMap = ev.getStateMap(partition); + Map<String, String> oldStateMap = oldEv.getStateMap(partition); + Assert.assertTrue(oldStateMap != null && stateMap != null); + for (String instance : stateMap.keySet()) { + String topoName = instance; + if (instance.equals(newParticipantName)) { + topoName = oldParticipantName; + } + if (!stateMap.get(instance).equals(oldStateMap.get(topoName))) { + Assert.fail("test"); + } + } + } + } + } +}
