[HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7147ec87 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7147ec87 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7147ec87 Branch: refs/heads/helix-0.6.x Commit: 7147ec874e912f27905c299fefe0d09ca31ebd42 Parents: ea0fbbb Author: Lei Xia <[email protected]> Authored: Thu Jun 16 12:06:34 2016 -0700 Committer: Lei Xia <[email protected]> Committed: Mon Sep 12 10:06:33 2016 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/helix/HelixAdmin.java | 30 + .../java/org/apache/helix/HelixConstants.java | 4 + .../main/java/org/apache/helix/PropertyKey.java | 3 +- .../controller/rebalancer/AutoRebalancer.java | 15 +- .../helix/controller/rebalancer/Rebalancer.java | 1 - .../strategy/AutoRebalanceStrategy.java | 754 ++++++++++++++++++ .../strategy/CrushRebalanceStrategy.java | 174 +++++ .../rebalancer/strategy/RebalanceStrategy.java | 57 ++ .../crushMapping/CRUSHPlacementAlgorithm.java | 316 ++++++++ .../strategy/crushMapping/JenkinsHash.java | 140 ++++ .../controller/rebalancer/topology/Node.java | 208 +++++ .../rebalancer/topology/Topology.java | 295 +++++++ .../controller/stages/ClusterDataCache.java | 14 +- .../strategy/AutoRebalanceStrategy.java | 753 ------------------ .../controller/strategy/RebalanceStrategy.java | 52 -- .../apache/helix/manager/zk/ZKHelixAdmin.java | 43 +- .../org/apache/helix/model/ClusterConfig.java | 92 +++ .../java/org/apache/helix/model/IdealState.java | 2 +- .../org/apache/helix/model/InstanceConfig.java | 50 +- .../task/GenericTaskAssignmentCalculator.java | 7 +- .../org/apache/helix/tools/ClusterSetup.java | 6 + .../Strategy/TestAutoRebalanceStrategy.java | 766 +++++++++++++++++++ .../strategy/TestAutoRebalanceStrategy.java | 765 ------------------ .../helix/controller/strategy/TestTopology.java | 172 +++++ .../integration/TestCrushAutoRebalance.java | 221 ++++++ .../manager/MockParticipantManager.java | 10 +- 26 files changed, 3355 insertions(+), 1595 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index fbfab26..aeacd4b 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -114,6 +114,18 @@ public interface HelixAdmin { String stateModelRef, String rebalancerMode); /** + * Add a resource to a cluster + * @param clusterName + * @param resourceName + * @param numPartitions + * @param stateModelRef + * @param rebalancerMode + * @param rebalanceStrategy + */ + void addResource(String clusterName, String resourceName, int numPartitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy); + + /** * Add a resource to a cluster, using a bucket size > 1 * @param clusterName * @param resourceName @@ -138,6 +150,22 @@ public interface HelixAdmin { void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance); + + /** + * Add a resource to a cluster, using a bucket size > 1 + * @param clusterName + * @param resourceName + * @param numPartitions + * @param stateModelRef + * @param rebalancerMode + * @param rebalanceStrategy + * @param bucketSize + * @param maxPartitionsPerInstance + */ + void addResource(String clusterName, String resourceName, int numPartitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize, + int maxPartitionsPerInstance); + /** * Add an instance to a cluster * @param clusterName @@ -411,6 +439,8 @@ public interface HelixAdmin { */ void removeInstanceTag(String clusterName, String instanceName, String tag); + void setInstanceZoneId(String clusterName, String instanceName, String zoneId); + /** * Release resources */ http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixConstants.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java index 5318fa9..6de0ff1 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java +++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java @@ -43,6 +43,10 @@ public interface HelixConstants { ANY_LIVEINSTANCE } + /** + * Replaced by ClusterConfig.ClusterConfigProperty. + */ + @Deprecated enum ClusterConfigType { HELIX_DISABLE_PIPELINE_TRIGGERS, DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of FULL-AUTO http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/PropertyKey.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java index 33355f1..0125902 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java @@ -38,6 +38,7 @@ import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER; import java.util.Arrays; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.CurrentState; import org.apache.helix.model.Error; @@ -186,7 +187,7 @@ public class PropertyKey { * @return {@link PropertyKey} */ public PropertyKey clusterConfig() { - return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class, + return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfig.class, _clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName); } http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index 6682426..ba237b1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -35,8 +35,8 @@ import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.LiveInstance; @@ -79,8 +79,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances(); String replicas = currentIdealState.getReplicas(); - LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>(); - stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas)); + LinkedHashMap<String, Integer> stateCountMap = + stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas)); List<String> liveNodes = new ArrayList<String>(liveInstance.keySet()); List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet()); allNodes.removeAll(clusterData.getDisabledInstances()); @@ -129,7 +129,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); String rebalanceStrategyName = currentIdealState.getRebalanceStrategy(); - if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) { + if (rebalanceStrategyName == null || rebalanceStrategyName + .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) { _rebalanceStrategy = new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition); } else { @@ -152,8 +153,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { } } - ZNRecord newMapping = - _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes); + ZNRecord newMapping = _rebalanceStrategy + .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData); if (LOG.isDebugEnabled()) { LOG.debug("currentMapping: " + currentMapping); http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java index f5a4ae8..6935378 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java @@ -46,5 +46,4 @@ public interface Rebalancer { */ IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, final CurrentStateOutput currentStateOutput, final ClusterDataCache clusterData); - } http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java new file mode 100644 index 0000000..868d207 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java @@ -0,0 +1,754 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.log4j.Logger; + +public class AutoRebalanceStrategy implements RebalanceStrategy { + private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class); + private final ReplicaPlacementScheme _placementScheme; + + private String _resourceName; + private List<String> _partitions; + private LinkedHashMap<String, Integer> _states; + private int _maximumPerNode; + + private Map<String, Node> _nodeMap; + private List<Node> _liveNodesList; + private Map<Integer, String> _stateMap; + + private Map<Replica, Node> _preferredAssignment; + private Map<Replica, Node> _existingPreferredAssignment; + private Map<Replica, Node> _existingNonPreferredAssignment; + private Set<Replica> _orphaned; + + public AutoRebalanceStrategy(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + init(resourceName, partitions, states, maximumPerNode); + _placementScheme = new DefaultPlacementScheme(); + } + + public AutoRebalanceStrategy(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states) { + this(resourceName, partitions, states, Integer.MAX_VALUE); + } + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + _maximumPerNode = maximumPerNode; + } + + @Override + public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes, + final Map<String, Map<String, String>> currentMapping, ClusterDataCache clusterData) { + int numReplicas = countStateReplicas(); + ZNRecord znRecord = new ZNRecord(_resourceName); + if (liveNodes.size() == 0) { + return znRecord; + } + int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size(); + int distFloor = (numReplicas * _partitions.size()) / liveNodes.size(); + _nodeMap = new HashMap<String, Node>(); + _liveNodesList = new ArrayList<Node>(); + + for (String id : allNodes) { + Node node = new Node(id); + node.capacity = 0; + node.hasCeilingCapacity = false; + _nodeMap.put(id, node); + } + for (int i = 0; i < liveNodes.size(); i++) { + boolean usingCeiling = false; + int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor; + if (distRemainder > 0 && targetSize < _maximumPerNode) { + targetSize += 1; + distRemainder = distRemainder - 1; + usingCeiling = true; + } + Node node = _nodeMap.get(liveNodes.get(i)); + node.isAlive = true; + node.capacity = targetSize; + node.hasCeilingCapacity = usingCeiling; + _liveNodesList.add(node); + } + + // compute states for all replica ids + _stateMap = generateStateMap(); + + // compute the preferred mapping if all nodes were up + _preferredAssignment = computePreferredPlacement(allNodes); + + // logger.info("preferred mapping:"+ preferredAssignment); + // from current mapping derive the ones in preferred location + // this will update the nodes with their current fill status + _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping); + + // from current mapping derive the ones not in preferred location + _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping); + + // compute orphaned replicas that are not assigned to any node + _orphaned = computeOrphaned(); + if (logger.isInfoEnabled()) { + logger.info("orphan = " + _orphaned); + } + + moveNonPreferredReplicasToPreferred(); + + assignOrphans(); + + moveExcessReplicas(); + + prepareResult(znRecord); + return znRecord; + } + + /** + * Move replicas assigned to non-preferred nodes if their current node is at capacity + * and its preferred node is under capacity. + */ + private void moveNonPreferredReplicasToPreferred() { + // iterate through non preferred and see if we can move them to the + // preferred location if the donor has more than it should and stealer has + // enough capacity + Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<Replica, Node> entry = iterator.next(); + Replica replica = entry.getKey(); + Node donor = entry.getValue(); + Node receiver = _preferredAssignment.get(replica); + if (donor.capacity < donor.currentlyAssigned + && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) { + donor.currentlyAssigned = donor.currentlyAssigned - 1; + receiver.currentlyAssigned = receiver.currentlyAssigned + 1; + donor.nonPreferred.remove(replica); + receiver.preferred.add(replica); + donor.newReplicas.remove(replica); + receiver.newReplicas.add(replica); + iterator.remove(); + } + } + } + + /** + * Slot in orphaned partitions randomly so as to maintain even load on live nodes. + */ + private void assignOrphans() { + // now iterate over nodes and remaining orphaned partitions and assign + // partitions randomly + // Better to iterate over orphaned partitions first + Iterator<Replica> it = _orphaned.iterator(); + while (it.hasNext()) { + Replica replica = it.next(); + boolean added = false; + int startIndex = computeRandomStartIndex(replica); + for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) { + Node receiver = _liveNodesList.get(index % _liveNodesList.size()); + if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) { + receiver.currentlyAssigned = receiver.currentlyAssigned + 1; + receiver.nonPreferred.add(replica); + receiver.newReplicas.add(replica); + added = true; + break; + } + } + if (!added) { + // try adding the replica by making room for it + added = assignOrphanByMakingRoom(replica); + } + if (added) { + it.remove(); + } + } + if (_orphaned.size() > 0 && logger.isInfoEnabled()) { + logger.info("could not assign nodes to partitions: " + _orphaned); + } + } + + /** + * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it + * @param replica The replica to assign + * @return true if the assignment succeeded, false otherwise + */ + private boolean assignOrphanByMakingRoom(Replica replica) { + Node capacityDonor = null; + Node capacityAcceptor = null; + int startIndex = computeRandomStartIndex(replica); + for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) { + Node current = _liveNodesList.get(index % _liveNodesList.size()); + if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned + && !current.canAddIfCapacity(replica) && capacityDonor == null) { + // this node has space but cannot accept the node + capacityDonor = current; + } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned + && current.canAddIfCapacity(replica) && capacityAcceptor == null) { + // this node would be able to accept the replica if it has ceiling capacity + capacityAcceptor = current; + } + if (capacityDonor != null && capacityAcceptor != null) { + break; + } + } + if (capacityDonor != null && capacityAcceptor != null) { + // transfer ceiling capacity and add the node + capacityAcceptor.steal(capacityDonor, replica); + return true; + } + return false; + } + + /** + * Move replicas from too-full nodes to nodes that can accept the replicas + */ + private void moveExcessReplicas() { + // iterate over nodes and move extra load + Iterator<Replica> it; + for (Node donor : _liveNodesList) { + if (donor.capacity < donor.currentlyAssigned) { + Collections.sort(donor.nonPreferred); + it = donor.nonPreferred.iterator(); + while (it.hasNext()) { + Replica replica = it.next(); + int startIndex = computeRandomStartIndex(replica); + for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) { + Node receiver = _liveNodesList.get(index % _liveNodesList.size()); + if (receiver.canAdd(replica)) { + receiver.currentlyAssigned = receiver.currentlyAssigned + 1; + receiver.nonPreferred.add(replica); + donor.currentlyAssigned = donor.currentlyAssigned - 1; + it.remove(); + break; + } + } + if (donor.capacity >= donor.currentlyAssigned) { + break; + } + } + if (donor.capacity < donor.currentlyAssigned) { + logger.warn("Could not take partitions out of node:" + donor.id); + } + } + } + } + + /** + * Update a ZNRecord with the results of the rebalancing. + * @param znRecord + */ + private void prepareResult(ZNRecord znRecord) { + // The map fields are keyed on partition name to a pair of node and state, i.e. it + // indicates that the partition with given state is served by that node + // + // The list fields are also keyed on partition and list all the nodes serving that partition. + // This is useful to verify that there is no node serving multiple replicas of the same + // partition. + Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>(); + for (String partition : _partitions) { + znRecord.setMapField(partition, new TreeMap<String, String>()); + znRecord.setListField(partition, new ArrayList<String>()); + newPreferences.put(partition, new ArrayList<String>()); + } + + // for preference lists, the rough priority that we want is: + // [existing preferred, existing non-preferred, non-existing preferred, non-existing + // non-preferred] + for (Node node : _liveNodesList) { + for (Replica replica : node.preferred) { + if (node.newReplicas.contains(replica)) { + newPreferences.get(replica.partition).add(node.id); + } else { + znRecord.getListField(replica.partition).add(node.id); + } + } + } + for (Node node : _liveNodesList) { + for (Replica replica : node.nonPreferred) { + if (node.newReplicas.contains(replica)) { + newPreferences.get(replica.partition).add(node.id); + } else { + znRecord.getListField(replica.partition).add(node.id); + } + } + } + normalizePreferenceLists(znRecord.getListFields(), newPreferences); + + // generate preference maps based on the preference lists + for (String partition : _partitions) { + List<String> preferenceList = znRecord.getListField(partition); + int i = 0; + for (String participant : preferenceList) { + znRecord.getMapField(partition).put(participant, _stateMap.get(i)); + i++; + } + } + } + + /** + * Adjust preference lists to reduce the number of same replicas on an instance. This will + * separately normalize two sets of preference lists, and then append the results of the second + * set to those of the first. This basically ensures that existing replicas are automatically + * preferred. + * @param preferenceLists map of (partition --> list of nodes) + * @param newPreferences map containing node preferences not consistent with the current + * assignment + */ + private void normalizePreferenceLists(Map<String, List<String>> preferenceLists, + Map<String, List<String>> newPreferences) { + + Map<String, Map<String, Integer>> nodeReplicaCounts = + new HashMap<String, Map<String, Integer>>(); + for (String partition : preferenceLists.keySet()) { + normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts); + } + for (String partition : newPreferences.keySet()) { + normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts); + preferenceLists.get(partition).addAll(newPreferences.get(partition)); + } + } + + /** + * Adjust a single preference list for replica assignment imbalance + * @param preferenceList list of node names + * @param nodeReplicaCounts map of (node --> state --> count) + */ + private void normalizePreferenceList(List<String> preferenceList, + Map<String, Map<String, Integer>> nodeReplicaCounts) { + List<String> newPreferenceList = new ArrayList<String>(); + int replicas = Math.min(countStateReplicas(), preferenceList.size()); + + // make this a LinkedHashSet to preserve iteration order + Set<String> notAssigned = new LinkedHashSet<String>(preferenceList); + for (int i = 0; i < replicas; i++) { + String state = _stateMap.get(i); + String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts); + newPreferenceList.add(node); + notAssigned.remove(node); + Map<String, Integer> counts = nodeReplicaCounts.get(node); + counts.put(state, counts.get(state) + 1); + } + preferenceList.clear(); + preferenceList.addAll(newPreferenceList); + } + + /** + * Get the node which hosts the fewest of a given replica + * @param state the state + * @param nodes nodes to check + * @param nodeReplicaCounts current assignment of replicas + * @return the node most willing to accept the replica + */ + private String getMinimumNodeForReplica(String state, Set<String> nodes, + Map<String, Map<String, Integer>> nodeReplicaCounts) { + String minimalNode = null; + int minimalCount = Integer.MAX_VALUE; + for (String node : nodes) { + int count = getReplicaCountForNode(state, node, nodeReplicaCounts); + if (count < minimalCount) { + minimalCount = count; + minimalNode = node; + } + } + return minimalNode; + } + + /** + * Safe check for the number of replicas of a given id assiged to a node + * @param state the state to assign + * @param node the node to check + * @param nodeReplicaCounts a map of node to replica id and counts + * @return the number of currently assigned replicas of the given id + */ + private int getReplicaCountForNode(String state, String node, + Map<String, Map<String, Integer>> nodeReplicaCounts) { + if (!nodeReplicaCounts.containsKey(node)) { + Map<String, Integer> replicaCounts = new HashMap<String, Integer>(); + replicaCounts.put(state, 0); + nodeReplicaCounts.put(node, replicaCounts); + return 0; + } + Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node); + if (!replicaCounts.containsKey(state)) { + replicaCounts.put(state, 0); + return 0; + } + return replicaCounts.get(state); + } + + /** + * Compute the subset of the current mapping where replicas are not mapped according to their + * preferred assignment. + * @param currentMapping Current mapping of replicas to nodes + * @return The current assignments that do not conform to the preferred assignment + */ + private Map<Replica, Node> computeExistingNonPreferredPlacement( + Map<String, Map<String, String>> currentMapping) { + Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>(); + int count = countStateReplicas(); + for (String partition : currentMapping.keySet()) { + Map<String, String> nodeStateMap = currentMapping.get(partition); + nodeStateMap.keySet().retainAll(_nodeMap.keySet()); + for (String nodeId : nodeStateMap.keySet()) { + Node node = _nodeMap.get(nodeId); + boolean skip = false; + for (Replica replica : node.preferred) { + if (replica.partition.equals(partition)) { + skip = true; + break; + } + } + if (skip) { + continue; + } + // check if its in one of the preferred position + for (int replicaId = 0; replicaId < count; replicaId++) { + Replica replica = new Replica(partition, replicaId); + if (!_preferredAssignment.containsKey(replica)) { + + logger.info("partitions: " + _partitions); + logger.info("currentMapping.keySet: " + currentMapping.keySet()); + throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions"); + } + + if (_preferredAssignment.get(replica).id != node.id + && !_existingPreferredAssignment.containsKey(replica) + && !existingNonPreferredAssignment.containsKey(replica)) { + existingNonPreferredAssignment.put(replica, node); + node.nonPreferred.add(replica); + + break; + } + } + } + } + return existingNonPreferredAssignment; + } + + /** + * Get a live node index to try first for a replica so that each possible start index is + * roughly uniformly assigned. + * @param replica The replica to assign + * @return The starting node index to try + */ + private int computeRandomStartIndex(final Replica replica) { + return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size(); + } + + /** + * Get a set of replicas not currently assigned to any node + * @return Unassigned replicas + */ + private Set<Replica> computeOrphaned() { + Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet()); + for (Replica r : _existingPreferredAssignment.keySet()) { + if (orphanedPartitions.contains(r)) { + orphanedPartitions.remove(r); + } + } + for (Replica r : _existingNonPreferredAssignment.keySet()) { + if (orphanedPartitions.contains(r)) { + orphanedPartitions.remove(r); + } + } + + return orphanedPartitions; + } + + /** + * Determine the replicas already assigned to their preferred nodes + * @param currentMapping Current assignment of replicas to nodes + * @return Assignments that conform to the preferred placement + */ + private Map<Replica, Node> computeExistingPreferredPlacement( + final Map<String, Map<String, String>> currentMapping) { + Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>(); + int count = countStateReplicas(); + for (String partition : currentMapping.keySet()) { + Map<String, String> nodeStateMap = currentMapping.get(partition); + nodeStateMap.keySet().retainAll(_nodeMap.keySet()); + for (String nodeId : nodeStateMap.keySet()) { + Node node = _nodeMap.get(nodeId); + node.currentlyAssigned = node.currentlyAssigned + 1; + // check if its in one of the preferred position + for (int replicaId = 0; replicaId < count; replicaId++) { + Replica replica = new Replica(partition, replicaId); + if (_preferredAssignment.containsKey(replica) + && !existingPreferredAssignment.containsKey(replica) + && _preferredAssignment.get(replica).id == node.id) { + existingPreferredAssignment.put(replica, node); + node.preferred.add(replica); + break; + } + } + } + } + + return existingPreferredAssignment; + } + + /** + * Given a predefined set of all possible nodes, compute an assignment of replicas to + * nodes that evenly assigns all replicas to nodes. + * @param allNodes Identifiers to all nodes, live and non-live + * @return Preferred assignment of replicas + */ + private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) { + Map<Replica, Node> preferredMapping; + preferredMapping = new HashMap<Replica, Node>(); + int partitionId = 0; + int numReplicas = countStateReplicas(); + int count = countStateReplicas(); + for (String partition : _partitions) { + for (int replicaId = 0; replicaId < count; replicaId++) { + Replica replica = new Replica(partition, replicaId); + String nodeName = + _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas, + allNodes); + preferredMapping.put(replica, _nodeMap.get(nodeName)); + } + partitionId = partitionId + 1; + } + return preferredMapping; + } + + /** + * Counts the total number of replicas given a state-count mapping + * @return + */ + private int countStateReplicas() { + int total = 0; + for (Integer count : _states.values()) { + total += count; + } + return total; + } + + /** + * Compute a map of replica ids to state names + * @return Map: replica id -> state name + */ + private Map<Integer, String> generateStateMap() { + int replicaId = 0; + Map<Integer, String> stateMap = new HashMap<Integer, String>(); + for (String state : _states.keySet()) { + Integer count = _states.get(state); + for (int i = 0; i < count; i++) { + stateMap.put(replicaId, state); + replicaId++; + } + } + return stateMap; + } + + /** + * A Node is an entity that can serve replicas. It has a capacity and knowledge + * of replicas assigned to it, so it can decide if it can receive additional replicas. + */ + class Node { + public int currentlyAssigned; + public int capacity; + public boolean hasCeilingCapacity; + private final String id; + boolean isAlive; + private final List<Replica> preferred; + private final List<Replica> nonPreferred; + private final Set<Replica> newReplicas; + + public Node(String id) { + preferred = new ArrayList<Replica>(); + nonPreferred = new ArrayList<Replica>(); + newReplicas = new TreeSet<Replica>(); + currentlyAssigned = 0; + isAlive = false; + this.id = id; + } + + /** + * Check if this replica can be legally added to this node + * @param replica The replica to test + * @return true if the assignment can be made, false otherwise + */ + public boolean canAdd(Replica replica) { + if (currentlyAssigned >= capacity) { + return false; + } + return canAddIfCapacity(replica); + } + + /** + * Check if this replica can be legally added to this node, provided that it has enough + * capacity. + * @param replica The replica to test + * @return true if the assignment can be made, false otherwise + */ + public boolean canAddIfCapacity(Replica replica) { + if (!isAlive) { + return false; + } + for (Replica r : preferred) { + if (r.partition.equals(replica.partition)) { + return false; + } + } + for (Replica r : nonPreferred) { + if (r.partition.equals(replica.partition)) { + return false; + } + } + return true; + } + + /** + * Receive a replica by stealing capacity from another Node + * @param donor The node that has excess capacity + * @param replica The replica to receive + */ + public void steal(Node donor, Replica replica) { + donor.hasCeilingCapacity = false; + donor.capacity--; + hasCeilingCapacity = true; + capacity++; + currentlyAssigned++; + nonPreferred.add(replica); + newReplicas.add(replica); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size()) + .append("\nnonpreferred:").append(nonPreferred.size()); + return sb.toString(); + } + } + + /** + * A Replica is a combination of a partition of the resource, the state the replica is in + * and an identifier signifying a specific replica of a given partition and state. + */ + class Replica implements Comparable<Replica> { + private String partition; + private int replicaId; // this is a partition-relative id + private String format; + + public Replica(String partition, int replicaId) { + this.partition = partition; + this.replicaId = replicaId; + this.format = this.partition + "|" + this.replicaId; + } + + @Override + public String toString() { + return format; + } + + @Override + public boolean equals(Object that) { + if (that instanceof Replica) { + return this.format.equals(((Replica) that).format); + } + return false; + } + + @Override + public int hashCode() { + return this.format.hashCode(); + } + + @Override + public int compareTo(Replica that) { + if (that instanceof Replica) { + return this.format.compareTo(that.format); + } + return -1; + } + } + + /** + * Interface for providing a custom approach to computing a replica's affinity to a node. + */ + public interface ReplicaPlacementScheme { + /** + * Initialize global state + * @param manager The instance to which this placement is associated + */ + public void init(final HelixManager manager); + + /** + * Given properties of this replica, determine the node it would prefer to be served by + * @param partitionId The current partition + * @param replicaId The current replica with respect to the current partition + * @param numPartitions The total number of partitions + * @param numReplicas The total number of replicas per partition + * @param nodeNames A list of identifiers of all nodes, live and non-live + * @return The name of the node that would prefer to serve this replica + */ + public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, + final List<String> nodeNames); + } + + /** + * Compute preferred placements based on a default strategy that assigns replicas to nodes as + * evenly as possible while avoiding placing two replicas of the same partition on any node. + */ + public static class DefaultPlacementScheme implements ReplicaPlacementScheme { + @Override + public void init(final HelixManager manager) { + // do nothing since this is independent of the manager + } + + @Override + public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, + final List<String> nodeNames) { + int index; + if (nodeNames.size() > numPartitions) { + // assign replicas in partition order in case there are more nodes than partitions + index = (partitionId + replicaId * numPartitions) % nodeNames.size(); + } else if (nodeNames.size() == numPartitions) { + // need a replica offset in case the sizes of these sets are the same + index = + ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId) + % nodeNames.size(); + } else { + // in all other cases, assigning a replica at a time for each partition is reasonable + index = (partitionId + replicaId) % nodeNames.size(); + } + return nodeNames.get(index); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java new file mode 100644 index 0000000..a8fe107 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java @@ -0,0 +1,174 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash; +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.InstanceConfig; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * CRUSH-based partition mapping strategy. + */ +public class CrushRebalanceStrategy implements RebalanceStrategy { + private String _resourceName; + private List<String> _partitions; + private Topology _clusterTopo; + private int _replicas; + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _replicas = countStateReplicas(states); + } + + /** + * Compute the preference lists and (optional partition-state mapping) for the given resource. + * + * @param allNodes All instances + * @param liveNodes List of live instances + * @param currentMapping current replica mapping + * @param clusterData cluster data + * @return + * @throws HelixException if a map can not be found + */ + @Override + public ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ClusterDataCache clusterData) throws HelixException { + Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap(); + _clusterTopo = + new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); + Node topNode = _clusterTopo.getRootNode(); + + Map<String, List<String>> newPreferences = new HashMap<String, List<String>>(); + for (int i = 0; i < _partitions.size(); i++) { + String partitionName = _partitions.get(i); + long data = partitionName.hashCode(); + + // apply the placement rules + List<Node> selected = select(topNode, data, _replicas); + + List<String> nodeList = new ArrayList<String>(); + for (int j = 0; j < selected.size(); j++) { + nodeList.add(selected.get(j).getName()); + } + + newPreferences.put(partitionName, nodeList); + } + + ZNRecord result = new ZNRecord(_resourceName); + result.setListFields(newPreferences); + + return result; + } + + /** + * Number of retries for finding an appropriate instance for a replica. + */ + private static final int MAX_RETRY = 100; + private final JenkinsHash hashFun = new JenkinsHash(); + private CRUSHPlacementAlgorithm placementAlgorithm = new CRUSHPlacementAlgorithm(); + + /** + * Enforce isolation on the specified fault zone. + * The caller will either get the expected number of selected nodes as a result, or an exception will be thrown. + */ + private List<Node> select(Node topNode, long data, int rf) + throws HelixException { + List<Node> nodes = new ArrayList<Node>(rf); + Set<Node> selectedZones = new HashSet<Node>(); + long input = data; + int count = rf; + int tries = 0; + while (nodes.size() < rf) { + doSelect(topNode, input, count, nodes, selectedZones); + count = rf - nodes.size(); + if (count > 0) { + input = hashFun.hash(input); // create a different hash value for retrying + tries++; + if (tries >= MAX_RETRY) { + throw new HelixException( + String.format("could not find all mappings after %d tries", tries)); + } + } + } + return nodes; + } + + private void doSelect(Node topNode, long input, int rf, List<Node> selectedNodes, + Set<Node> selectedZones) { + String zoneType = _clusterTopo.getFaultZoneType(); + String endNodeType = _clusterTopo.getEndNodeType(); + + if (!zoneType.equals(endNodeType)) { + // pick fault zones first + List<Node> zones = placementAlgorithm + .select(topNode, input, rf, zoneType, nodeAlreadySelected(selectedZones)); + // add the racks to the selected racks + selectedZones.addAll(zones); + // pick one end node from each fault zone. + for (Node zone : zones) { + List<Node> endNode = placementAlgorithm.select(zone, input, 1, endNodeType); + selectedNodes.addAll(endNode); + } + } else { + // pick end node directly + List<Node> nodes = placementAlgorithm.select(topNode, input, rf, endNodeType, + nodeAlreadySelected(new HashSet(selectedNodes))); + selectedNodes.addAll(nodes); + } + } + + /** + * Use the predicate to reject already selected zones or nodes. + */ + private Predicate<Node> nodeAlreadySelected(Set<Node> selectedNodes) { + return Predicates.not(Predicates.in(selectedNodes)); + } + + /** + * Counts the total number of replicas given a state-count mapping + * @return + */ + private int countStateReplicas(Map<String, Integer> stateCountMap) { + int total = 0; + for (Integer count : stateCountMap.values()) { + total += count; + } + return total; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java new file mode 100644 index 0000000..a3c7e94 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java @@ -0,0 +1,57 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.stages.ClusterDataCache; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Assignment strategy interface that computes the assignment of partition->instance. + */ +public interface RebalanceStrategy { + String DEFAULT_REBALANCE_STRATEGY = "DEFAULT"; + + /** + * Perform the necessary initialization for the rebalance strategy object. + * + * @param resourceName + * @param partitions + * @param states + * @param maximumPerNode + */ + void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode); + + /** + * Compute the preference lists and (optional partition-state mapping) for the given resource. + * + * @param liveNodes + * @param currentMapping + * @param allNodes + * @return + */ + ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ClusterDataCache clusterData); +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java new file mode 100644 index 0000000..870656c --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java @@ -0,0 +1,316 @@ +/** + * Copyright 2013 Twitter, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.helix.controller.rebalancer.strategy.crushMapping; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.controller.rebalancer.topology.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The transcription of the CRUSH placement algorithm from the Weil paper. This is a fairly simple + * adaptation, but a couple of important changes have been made to work with the crunch mapping. + */ +public class CRUSHPlacementAlgorithm { + /** + * In case the select() method fails to select after looping back to the origin of selection after + * so many tries, we stop the search. This constant denotes the maximum number of retries after + * looping back to the origin. It is expected that in most cases the selection will either succeed + * with a small number of tries, or it will never succeed. So a reasonably large number to + * distinguish these two cases should be sufficient. + */ + private static final int MAX_LOOPBACK_COUNT = 50; + private static final Logger logger = LoggerFactory.getLogger(CRUSHPlacementAlgorithm.class); + + private final boolean keepOffset; + private final Map<Long,Integer> roundOffset; + + /** + * Creates the crush placement object. + */ + public CRUSHPlacementAlgorithm() { + this(false); + } + + /** + * Creates the crush placement algorithm with the indication whether the round offset should be + * kept for the duration of this object for successive selection of the same input. + */ + public CRUSHPlacementAlgorithm(boolean keepOffset) { + this.keepOffset = keepOffset; + roundOffset = keepOffset ? new HashMap<Long,Integer>() : null; + } + + /** + * Returns a list of (count) nodes of the desired type. If the count is more than the number of + * available nodes, an exception is thrown. Note that it is possible for this method to return a + * list whose size is smaller than the requested size (count) if it is unable to select all the + * nodes for any reason. Callers should check the size of the returned list and take action if + * needed. + */ + public List<Node> select(Node parent, long input, int count, String type) { + return select(parent, input, count, type, Predicates.<Node>alwaysTrue()); + } + + public List<Node> select(Node parent, long input, int count, String type, + Predicate<Node> nodePredicate) { + int childCount = parent.getChildrenCount(type); + if (childCount < count) { + throw new IllegalArgumentException(count + " nodes of type " + type + + " were requested but the tree has only " + childCount + " nodes!"); + } + + List<Node> selected = new ArrayList<Node>(count); + // use the index stored in the map + Integer offset; + if (keepOffset) { + offset = roundOffset.get(input); + if (offset == null) { + offset = 0; + roundOffset.put(input, offset); + } + } else { + offset = 0; + } + + int rPrime = 0; + for (int r = 1; r <= count; r++) { + int failure = 0; + // number of times we had to loop back to the origin + int loopbackCount = 0; + boolean escape = false; + boolean retryOrigin; + Node out = null; + do { + retryOrigin = false; // initialize at the outset + Node in = parent; + Set<Node> rejected = new HashSet<Node>(); + boolean retryNode; + do { + retryNode = false; // initialize at the outset + rPrime = r + offset + failure; + logger.trace("{}.select({}, {})", new Object[] {in, input, rPrime}); + Selector selector = new Selector(in); + out = selector.select(input, rPrime); + if (!out.getType().equalsIgnoreCase(type)) { + logger.trace("selected output {} for data {} didn't match the type {}: walking down " + + "the hierarchy...", new Object[] {out, input, type}); + in = out; // walk down the hierarchy + retryNode = true; // stay within the node and walk down the tree + } else { // type matches + boolean predicateRejected = !nodePredicate.apply(out); + if (selected.contains(out) || predicateRejected) { + if (predicateRejected) { + logger.trace("{} was rejected by the node predicate for data {}: rejecting and " + + "increasing rPrime", out, input); + rejected.add(out); + } else { // already selected + logger.trace("{} was already selected for data {}: rejecting and increasing rPrime", + out, input); + } + + // we need to see if we have selected all possible nodes from this parent, in which + // case we should loop back to the origin and start over + if (allChildNodesEliminated(in, selected, rejected)) { + logger.trace("all child nodes of {} have been eliminated", in); + if (loopbackCount == MAX_LOOPBACK_COUNT) { + // we looped back the maximum times we specified; we give up search, and exit + escape = true; + break; + } + loopbackCount++; + logger.trace("looping back to the original parent node ({})", parent); + retryOrigin = true; + } else { + retryNode = true; // go back and reselect on the same parent + } + failure++; + } else if (nodeIsOut(out)) { + logger.trace("{} is marked as out (failed or over the maximum assignment) for data " + + "{}! looping back to the original parent node", out, input); + failure++; + if (loopbackCount == MAX_LOOPBACK_COUNT) { + // we looped back the maximum times we specified; we give up search, and exit + escape = true; + break; + } + loopbackCount++; + // re-selection on the same parent is detrimental in case of node failure: loop back + // to the origin + retryOrigin = true; + } else { + // we got a successful selection + break; + } + } + } while (retryNode); + } while (retryOrigin); + + if (escape) { + // cannot find a node under this parent; return a smaller set than was intended + logger.debug("we could not select a node for data {} under parent {}; a smaller data set " + + "than is requested will be returned", input, parent); + continue; + } + + logger.trace("{} was selected for data {}", out, input); + selected.add(out); + } + if (keepOffset) { + roundOffset.put(input, rPrime); + } + return selected; + } + + + private boolean nodeIsOut(Node node) { + if (node.isLeaf() && node.isFailed()) { + return true; + } + return false; + } + + /** + * Examines the immediate child nodes of the given parent node, and sees if all of the children + * that can be selected (i.e. not failed) are already selected. This is used to determine whether + * this parent node should no longer be used in the selection. + */ + private boolean allChildNodesEliminated(Node parent, List<Node> selected, Set<Node> rejected) { + List<Node> children = parent.getChildren(); + if (children != null) { + for (Node child: children) { + if (!nodeIsOut(child) && !selected.contains(child) && !rejected.contains(child)) { + return false; + } + } + } + return true; + } + + /** + * Selection algorithm based on the "straw" bucket type as described in the CRUSH algorithm. + */ + private class Selector { + private final Map<Node,Long> straws = new HashMap<Node,Long>(); + private final JenkinsHash hashFunction; + + public Selector(Node node) { + if (!node.isLeaf()) { + // create a map from the nodes to their values + List<Node> sortedNodes = sortNodes(node.getChildren()); // do a reverse sort by weight + + int numLeft = sortedNodes.size(); + float straw = 1.0f; + float wbelow = 0.0f; + float lastw = 0.0f; + int i = 0; + final int length = sortedNodes.size(); + while (i < length) { + Node current = sortedNodes.get(i); + if (current.getWeight() == 0) { + straws.put(current, 0L); + i++; + continue; + } + straws.put(current, (long)(straw*0x10000)); + i++; + if (i == length) { + break; + } + + current = sortedNodes.get(i); + Node previous = sortedNodes.get(i-1); + if (current.getWeight() == previous.getWeight()) { + continue; + } + wbelow += (float)(previous.getWeight() - lastw)*numLeft; + for (int j = i; j < length; j++) { + if (sortedNodes.get(j).getWeight() == current.getWeight()) { + numLeft--; + } else { + break; + } + } + float wnext = (float)(numLeft * (current.getWeight() - previous.getWeight())); + float pbelow = wbelow/(wbelow + wnext); + straw *= Math.pow(1.0/pbelow, 1.0/numLeft); + lastw = previous.getWeight(); + } + } + hashFunction = new JenkinsHash(); + } + + /** + * Returns a new list that's sorted in the reverse order of the weight. + */ + private List<Node> sortNodes(List<Node> nodes) { + List<Node> ret = new ArrayList<Node>(nodes); + sortNodesInPlace(ret); + return ret; + } + + /** + * Sorts the list in place in the reverse order of the weight. + */ + private void sortNodesInPlace(List<Node> nodes) { + Collections.sort(nodes, new Comparator<Node>() { + public int compare(Node n1, Node n2) { + if (n2.getWeight() == n1.getWeight()) { + return 0; + } + return (n2.getWeight() - n1.getWeight() > 0) ? 1 : -1; + // sort by weight only in the reverse order + } + }); + } + + public Node select(long input, long round) { + Node selected = null; + long hiScore = -1; + for (Map.Entry<Node,Long> e: straws.entrySet()) { + Node child = e.getKey(); + long straw = e.getValue(); + long score = weightedScore(child, straw, input, round); + if (score > hiScore) { + selected = child; + hiScore = score; + } + } + if (selected == null) { + throw new IllegalStateException(); + } + return selected; + } + + private long weightedScore(Node child, long straw, long input, long round) { + long hash = hashFunction.hash(input, child.getId(), round); + hash = hash&0xffff; + long weightedScore = hash*straw; + return weightedScore; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java new file mode 100644 index 0000000..66566f8 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java @@ -0,0 +1,140 @@ +/** + * Copyright 2013 Twitter, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.helix.controller.rebalancer.strategy.crushMapping; + +public class JenkinsHash { + // max value to limit it to 4 bytes + private static final long MAX_VALUE = 0xFFFFFFFFL; + private static final long CRUSH_HASH_SEED = 1315423911L; + + /** + * Convert a byte into a long value without making it negative. + */ + private static long byteToLong(byte b) { + long val = b & 0x7F; + if ((b & 0x80) != 0) { + val += 128; + } + return val; + } + + /** + * Do addition and turn into 4 bytes. + */ + private static long add(long val, long add) { + return (val + add) & MAX_VALUE; + } + + /** + * Do subtraction and turn into 4 bytes. + */ + private static long subtract(long val, long subtract) { + return (val - subtract) & MAX_VALUE; + } + + /** + * Left shift val by shift bits and turn in 4 bytes. + */ + private static long xor(long val, long xor) { + return (val ^ xor) & MAX_VALUE; + } + + /** + * Left shift val by shift bits. Cut down to 4 bytes. + */ + private static long leftShift(long val, int shift) { + return (val << shift) & MAX_VALUE; + } + + /** + * Convert 4 bytes from the buffer at offset into a long value. + */ + private static long fourByteToLong(byte[] bytes, int offset) { + return (byteToLong(bytes[offset + 0]) + + (byteToLong(bytes[offset + 1]) << 8) + + (byteToLong(bytes[offset + 2]) << 16) + + (byteToLong(bytes[offset + 3]) << 24)); + } + + /** + * Mix up the values in the hash function. + */ + private static Triple hashMix(Triple t) { + long a = t.a; long b = t.b; long c = t.c; + a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13); + b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8)); + c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13)); + a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12)); + b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16)); + c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5)); + a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3)); + b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10)); + c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15)); + return new Triple(a, b, c); + } + + private static class Triple { + long a; + long b; + long c; + + public Triple(long a, long b, long c) { + this.a = a; this.b = b; this.c = c; + } + } + + public long hash(long a) { + long hash = xor(CRUSH_HASH_SEED, a); + long b = a; + long x = 231232L; + long y = 1232L; + Triple val = hashMix(new Triple(b, x, hash)); + b = val.a; x = val.b; hash = val.c; + val = hashMix(new Triple(y, a, hash)); + hash = val.c; + return hash; + } + + public long hash(long a, long b) { + long hash = xor(xor(CRUSH_HASH_SEED, a), b); + long x = 231232L; + long y = 1232L; + Triple val = hashMix(new Triple(a, b, hash)); + a = val.a; b = val.b; hash = val.c; + val = hashMix(new Triple(x, a, hash)); + x = val.a; a = val.b; hash = val.c; + val = hashMix(new Triple(b, y, hash)); + hash = val.c; + return hash; + } + + public long hash(long a, long b, long c) { + long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c); + long x = 231232L; + long y = 1232L; + Triple val = hashMix(new Triple(a, b, hash)); + a = val.a; b = val.b; hash = val.c; + val = hashMix(new Triple(c, x, hash)); + c = val.a; x = val.b; hash = val.c; + val = hashMix(new Triple(y, a, hash)); + y = val.a; a = val.b; hash = val.c; + val = hashMix(new Triple(b, x, hash)); + b = val.a; x = val.b; hash = val.c; + val = hashMix(new Triple(y, c, hash)); + hash = val.c; + return hash; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java new file mode 100644 index 0000000..3a52a21 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java @@ -0,0 +1,208 @@ +package org.apache.helix.controller.rebalancer.topology; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +public class Node implements Comparable<Node> { + private String _name; + private String _type; + private long _id; + private long _weight; + + private LinkedHashMap<String, Node> _children = new LinkedHashMap<String, Node>(); + private Node _parent; + + private boolean _failed; + + public Node() { + + } + + public Node(Node node) { + _name = node.getName(); + _type = node.getType(); + _id = node.getId(); + _weight = node.getWeight(); + _failed = node.isFailed(); + } + + public String getName() { + return _name; + } + + public void setName(String name) { + _name = name; + } + + public String getType() { + return _type; + } + + public void setType(String type) { + _type = type; + } + + public long getId() { + return _id; + } + + public void setId(long id) { + _id = id; + } + + public long getWeight() { + return _weight; + } + + public void setWeight(long weight) { + _weight = weight; + } + + public void addWeight(long weight) { _weight += weight; } + + public boolean isFailed() { + return _failed; + } + + public void setFailed(boolean failed) { + if (!isLeaf()) { + throw new UnsupportedOperationException("you cannot set failed on a non-leaf!"); + } + _failed = failed; + } + + public List<Node> getChildren() { + return new ArrayList<Node>(_children.values()); + } + + /** + * Add a child, if there exists a child with the same name, will replace it. + * + * @param child + */ + public void addChild(Node child) { + _children.put(child.getName(), child); + } + + /** + * Has child with given name. + * @param name + * @return + */ + public boolean hasChild(String name) { + return _children.containsKey(name); + } + + /** + * Get child node with given name. + * + * @param name + * @return + */ + public Node getChild(String name) { + return _children.get(name); + } + + public boolean isLeaf() { + return _children == null || _children.isEmpty(); + } + + public Node getParent() { + return _parent; + } + + public void setParent(Node parent) { + _parent = parent; + } + + /** + * Returns all child nodes that match the type. Returns itself if this node matches it. If no + * child matches the type, an empty list is returned. + */ + protected List<Node> findChildren(String type) { + List<Node> nodes = new ArrayList<Node>(); + if (_type.equalsIgnoreCase(type)) { + nodes.add(this); + } else if (!isLeaf()) { + for (Node child: _children.values()) { + nodes.addAll(child.findChildren(type)); + } + } + return nodes; + } + + /** + * Returns the number of all child nodes that match the type. Returns 1 if this node matches it. + * Returns 0 if no child matches the type. + */ + public int getChildrenCount(String type) { + int count = 0; + if (_type.equalsIgnoreCase(type)) { + count++; + } else if (!isLeaf()) { + for (Node child: _children.values()) { + count += child.getChildrenCount(type); + } + } + return count; + } + + /** + * Returns the top-most ("root") node from this node. If this node itself does not have a parent, + * returns itself. + */ + public Node getRoot() { + Node node = this; + while (node.getParent() != null) { + node = node.getParent(); + } + return node; + } + + @Override + public String toString() { + return _name + ":" + _id; + } + + @Override + public int hashCode() { + return _name.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Node)) { + return false; + } + Node that = (Node)obj; + return _name.equals(that.getName()); + } + + @Override + public int compareTo(Node o) { + return _name.compareTo(o.getName()); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java new file mode 100644 index 0000000..1057fad --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -0,0 +1,295 @@ +package org.apache.helix.controller.rebalancer.topology; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.log4j.Logger; + + +/** + * Topology represents the structure of a cluster (the hierarchy of the nodes, its fault boundary, etc). + * This class is intended for topology-aware partition placement. + */ +public class Topology { + private static Logger logger = Logger.getLogger(Topology.class); + public enum Types { + ROOT, + ZONE, + INSTANCE + } + private static final int DEFAULT_NODE_WEIGHT = 1000; + + private final MessageDigest _md; + private Node _root; // root of the tree structure of all nodes; + private List<String> _allInstances; + private List<String> _liveInstances; + private Map<String, InstanceConfig> _instanceConfigMap; + private HelixProperty _clusterConfig; + private String _faultZoneType; + private String _endNodeType; + private boolean _useDefaultTopologyDef; + private LinkedHashSet<String> _types; + + /* default names for domain paths, if value is not specified for a domain path, the default one is used */ + // TODO: default values can be defined in clusterConfig. + private Map<String, String> _defaultDomainPathValues = new HashMap<String, String>(); + + public Topology(final List<String> allNodes, final List<String> liveNodes, + final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) { + try { + _md = MessageDigest.getInstance("SHA-1"); + _allInstances = allNodes; + _liveInstances = liveNodes; + _instanceConfigMap = instanceConfigMap; + _clusterConfig = clusterConfig; + _types = new LinkedHashSet<String>(); + + String topologyDef = _clusterConfig.getRecord() + .getSimpleField(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name()); + if (topologyDef != null) { + // Customized cluster topology definition is configured. + String[] types = topologyDef.trim().split("/"); + for (int i = 0; i < types.length; i++) { + if (types[i].length() != 0) { + _types.add(types[i]); + } + } + if (_types.size() == 0) { + logger.error("Invalid cluster topology definition " + topologyDef); + throw new HelixException("Invalid cluster topology definition " + topologyDef); + } else { + String lastType = null; + for (String type : _types) { + _defaultDomainPathValues.put(type, "Helix_default_" + type); + lastType = type; + } + _endNodeType = lastType; + _faultZoneType = _clusterConfig.getRecord() + .getStringField(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), + _endNodeType); + if (!_types.contains(_faultZoneType)) { + throw new HelixException(String + .format("Invalid fault zone type %s, not present in topology definition %s.", + _faultZoneType, topologyDef)); + } + _useDefaultTopologyDef = false; + } + } else { + // Use default cluster topology definition, i,e. /root/zone/instance + _types.add(Types.ZONE.name()); + _types.add(Types.INSTANCE.name()); + _endNodeType = Types.INSTANCE.name(); + _faultZoneType = Types.ZONE.name(); + _useDefaultTopologyDef = true; + } + } catch (NoSuchAlgorithmException ex) { + throw new IllegalArgumentException(ex); + } + if (_useDefaultTopologyDef) { + _root = createClusterTreeWithDefaultTopologyDef(); + } else { + _root = createClusterTreeWithCustomizedTopology(); + } + } + + public String getEndNodeType() { + return _endNodeType; + } + + public String getFaultZoneType() { + return _faultZoneType; + } + + public Node getRootNode() { + return _root; + } + + public List<Node> getFaultZones() { + if (_root != null) { + return _root.findChildren(getFaultZoneType()); + } + return Collections.emptyList(); + } + + /** + * Creates a tree representing the cluster structure using default cluster topology definition + * (i,e no topology definition given and no domain id set). + */ + private Node createClusterTreeWithDefaultTopologyDef() { + // root + Node root = new Node(); + root.setName("root"); + root.setId(computeId("root")); + root.setType(Types.ROOT.name()); + + for (String ins : _allInstances) { + InstanceConfig config = _instanceConfigMap.get(ins); + if (config == null) { + throw new HelixException(String.format("Config for instance %s is not found!", ins)); + } + String zone = config.getZoneId(); + if (zone == null) { + //TODO: we should allow non-rack cluster for back-compatible. This should be solved once + // we have the hierarchy style of domain id for instance. + throw new HelixException(String + .format("ZONE_ID for instance %s is not set, failed the topology-aware placement!", + ins)); + } + Map<String, String> pathValueMap = new HashMap<String, String>(); + pathValueMap.put(Types.ZONE.name(), zone); + pathValueMap.put(Types.INSTANCE.name(), ins); + + int weight = config.getWeight(); + if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { + weight = DEFAULT_NODE_WEIGHT; + } + addEndNode(root, ins, pathValueMap, weight, _liveInstances); + } + + return root; + } + + /** + * Creates a tree representing the cluster structure using default cluster topology definition + * (i,e no topology definition given and no domain id set). + */ + private Node createClusterTreeWithCustomizedTopology() { + // root + Node root = new Node(); + root.setName("root"); + root.setId(computeId("root")); + root.setType(Types.ROOT.name()); + + for (String ins : _allInstances) { + InstanceConfig insConfig = _instanceConfigMap.get(ins); + if (insConfig == null) { + throw new HelixException(String.format("Config for instance %s is not found!", ins)); + } + String domain = insConfig.getDomain(); + if (domain == null) { + throw new HelixException(String + .format("Domain for instance %s is not set, failed the topology-aware placement!", + ins)); + } + + String[] pathPairs = domain.trim().split(","); + Map<String, String> pathValueMap = new HashMap<String, String>(); + for (String pair : pathPairs) { + String[] values = pair.trim().split("="); + if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) { + throw new HelixException(String.format( + "Domain-Value pair %s for instance %s is not valid, failed the topology-aware placement!", + pair, ins)); + } + String type = values[0]; + String value = values[1]; + + if (!_types.contains(type)) { + logger.warn(String + .format("Path %s defined in domain of instance %s not recognized, ignored!", pair, + ins)); + continue; + } + pathValueMap.put(type, value); + } + + int weight = insConfig.getWeight(); + if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { + weight = DEFAULT_NODE_WEIGHT; + } + + root = addEndNode(root, ins, pathValueMap, weight, _liveInstances); + } + + return root; + } + + + /** + * Add an end node to the tree, create all the paths to the leaf node if not present. + */ + private Node addEndNode(Node root, String instanceName, Map<String, String> pathNameMap, + int instanceWeight, List<String> liveInstances) { + Node current = root; + List<Node> pathNodes = new ArrayList<Node>(); + for (String path : _types) { + String pathValue = pathNameMap.get(path); + if (pathValue == null || pathValue.isEmpty()) { + pathValue = _defaultDomainPathValues.get(path); + } + pathNodes.add(current); + if (!current.hasChild(pathValue)) { + Node n = new Node(); + n.setName(pathValue); + n.setId(computeId(pathValue)); + n.setType(path); + n.setParent(current); + + // if it is leaf node. + if (path.equals(_endNodeType)) { + if (liveInstances.contains(instanceName)) { + // node is alive + n.setWeight(instanceWeight); + // add instance weight to all of its parent nodes. + for (Node node : pathNodes) { + node.addWeight(instanceWeight); + } + } else { + n.setFailed(true); + n.setWeight(0); + } + } + current.addChild(n); + } + current = current.getChild(pathValue); + } + return root; + } + + private long computeId(String name) { + byte[] h = _md.digest(name.getBytes()); + return bstrTo32bit(h); + } + + private long bstrTo32bit(byte[] bstr) { + if (bstr.length < 4) { + throw new IllegalArgumentException("hashed is less than 4 bytes!"); + } + // need to "simulate" unsigned int + return (long) (((ord(bstr[0]) << 24) | (ord(bstr[1]) << 16) | (ord(bstr[2]) << 8) | (ord( + bstr[3])))) & 0xffffffffL; + } + + private int ord(byte b) { + return b & 0xff; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index cb5bda8..dacf98d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -32,6 +32,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.CurrentState; @@ -56,6 +57,7 @@ public class ClusterDataCache { private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; + private ClusterConfig _clusterConfig; Map<String, LiveInstance> _liveInstanceMap; Map<String, LiveInstance> _liveInstanceCacheMap; Map<String, IdealState> _idealStateMap; @@ -200,11 +202,11 @@ public class ClusterDataCache { _currentStateMap = Collections.unmodifiableMap(allCurStateMap); _idealStateRuleMap = Maps.newHashMap(); - HelixProperty clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); - if (clusterConfig != null) { - for (String simpleKey : clusterConfig.getRecord().getSimpleFields().keySet()) { + _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); + if (_clusterConfig != null) { + for (String simpleKey : _clusterConfig.getRecord().getSimpleFields().keySet()) { if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) { - String simpleValue = clusterConfig.getRecord().getSimpleField(simpleKey); + String simpleValue = _clusterConfig.getRecord().getSimpleField(simpleKey); String[] rules = simpleValue.split("(?<!\\\\),"); Map<String, String> singleRule = Maps.newHashMap(); for (String rule : rules) { @@ -232,6 +234,10 @@ public class ClusterDataCache { return true; } + public ClusterConfig getClusterConfig() { + return _clusterConfig; + } + /** * Retrieves the idealstates for all resources * @return
