Repository: hadoop Updated Branches: refs/heads/branch-2.7 740450611 -> 1ae8a8342
HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy. Contributed by Brahma Reddy Battula. Branch-2.7 patch contributed by Xiao Chen. Change-Id: I1b32627bc2a2f30f5debeaba7663fb2777958079 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ae8a834 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ae8a834 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ae8a834 Branch: refs/heads/branch-2.7 Commit: 1ae8a8342edc18fa40ad1170a7cc4d41e7adeaf7 Parents: 7404506 Author: Zhe Zhang <[email protected]> Authored: Thu Dec 3 10:29:48 2015 -0800 Committer: Zhe Zhang <[email protected]> Committed: Thu Dec 3 10:30:48 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/net/NetworkTopology.java | 34 +++- .../net/NetworkTopologyWithNodeGroup.java | 2 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 168 ++++++------------- .../blockmanagement/BlockPlacementPolicy.java | 50 +++--- .../BlockPlacementPolicyDefault.java | 112 +++++++++++-- .../server/blockmanagement/DatanodeManager.java | 8 - .../hdfs/server/namenode/NamenodeFsck.java | 2 +- .../hdfs/server/balancer/TestBalancer.java | 9 +- .../blockmanagement/TestBlockManager.java | 14 +- .../blockmanagement/TestReplicationPolicy.java | 79 ++++++++- .../TestReplicationPolicyWithNodeGroup.java | 12 +- .../hdfs/server/namenode/ha/TestDNFencing.java | 8 +- 13 files changed, 291 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index b729bda..573c534 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -53,9 +53,9 @@ import com.google.common.collect.Lists; public class NetworkTopology { public final static String DEFAULT_RACK = "/default-rack"; public final static int DEFAULT_HOST_LEVEL = 2; - public static final Log LOG = + public static final Log LOG = LogFactory.getLog(NetworkTopology.class); - + public static class InvalidTopologyException extends RuntimeException { private static final long serialVersionUID = 1L; public InvalidTopologyException(String msg) { @@ -379,6 +379,13 @@ public class NetworkTopology { private int depthOfAllLeaves = -1; /** rack counter */ protected int numOfRacks = 0; + + /** + * Whether or not this cluster has ever consisted of more than 1 rack, + * according to the NetworkTopology. + */ + private boolean clusterEverBeenMultiRack = false; + /** the lock used to manage access */ protected ReadWriteLock netlock = new ReentrantReadWriteLock(); @@ -418,7 +425,7 @@ public class NetworkTopology { if (clusterMap.add(node)) { LOG.info("Adding a new node: "+NodeBase.getPath(node)); if (rack == null) { - numOfRacks++; + incrementRacks(); } if (!(node instanceof InnerNode)) { if (depthOfAllLeaves == -1) { @@ -433,7 +440,14 @@ public class NetworkTopology { netlock.writeLock().unlock(); } } - + + protected void incrementRacks() { + numOfRacks++; + if (!clusterEverBeenMultiRack && numOfRacks > 1) { + clusterEverBeenMultiRack = true; + } + } + /** * Return a reference to the node given its string representation. * Default implementation delegates to {@link #getNode(String)}. @@ -541,10 +555,18 @@ public class NetworkTopology { netlock.readLock().unlock(); } } - + + /** + * @return true if this cluster has ever consisted of multiple racks, even if + * it is not now a multi-rack cluster. + */ + public boolean hasClusterEverBeenMultiRack() { + return clusterEverBeenMultiRack; + } + /** Given a string representation of a rack for a specific network * location - * + * * To be overridden in subclasses for specific NetworkTopology * implementations, as alternative to overriding the full * {@link #getRack(String)} method. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index 3de49dc..72031aa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -205,7 +205,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology { LOG.info("Adding a new node: " + NodeBase.getPath(node)); if (rack == null) { // We only track rack number here - numOfRacks++; + incrementRacks(); } } if(LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 074e779..ea831c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -8,6 +8,9 @@ Release 2.7.3 - UNRELEASED IMPROVEMENTS + HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy. + (Brahma Reddy Battula via mingma) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 81367ea..db50e44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1364,7 +1364,7 @@ public class BlockManager { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (isPlacementPolicySatisfied(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications neededReplications.decrementReplicationIndex(priority); blockLog.info("BLOCK* Removing {} from neededReplications as" + @@ -1435,7 +1435,7 @@ public class BlockManager { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (isPlacementPolicySatisfied(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications neededReplications.decrementReplicationIndex(priority); rw.targets = null; @@ -1446,7 +1446,7 @@ public class BlockManager { } if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { + (!isPlacementPolicySatisfied(block)) ) { if (rw.srcNode.getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case @@ -2902,109 +2902,48 @@ public class BlockManager { } } chooseExcessReplicates(nonExcess, block, replication, - addedNode, delNodeHint, blockplacement); + addedNode, delNodeHint); } - - /** - * We want "replication" replicates for the block, but we now have too many. - * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that: - * - * srcNodes.size() - dstNodes.size() == replication - * - * We pick node that make sure that replicas are spread across racks and - * also try hard to pick one with least free space. - * The algorithm is first to pick a node with least free space from nodes - * that are on a rack holding more than one replicas of the block. - * So removing such a replica won't remove a rack. - * If no such a node is available, - * then pick a node with least free space - */ - private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, - Block b, short replication, - DatanodeDescriptor addedNode, - DatanodeDescriptor delNodeHint, - BlockPlacementPolicy replicator) { + private void chooseExcessReplicates( + final Collection<DatanodeStorageInfo> nonExcess, + Block b, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(b); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID()); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); final List<StorageType> excessTypes = storagePolicy.chooseExcess( replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); - - - final Map<String, List<DatanodeStorageInfo>> rackMap - = new HashMap<String, List<DatanodeStorageInfo>>(); - final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); - final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); - - // split nodes into two sets - // moreThanOne contains nodes on rack with more than one replica - // exactlyOne contains the remaining nodes - replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne); - - // pick one node to delete that favors the delete hint - // otherwise pick one with least space from priSet if it is not empty - // otherwise one node with least space from remains - boolean firstOne = true; - final DatanodeStorageInfo delNodeHintStorage - = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint); - final DatanodeStorageInfo addedNodeStorage - = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode); - while (nonExcess.size() - replication > 0) { - final DatanodeStorageInfo cur; - if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, - moreThanOne, excessTypes)) { - cur = delNodeHintStorage; - } else { // regular excessive replica removal - cur = replicator.chooseReplicaToDelete(bc, b, replication, - moreThanOne, exactlyOne, excessTypes); - } - firstOne = false; - - // adjust rackmap, moreThanOne, and exactlyOne - replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, - exactlyOne, cur); - - nonExcess.remove(cur); - addToExcessReplicate(cur.getDatanodeDescriptor(), b); - - // - // The 'excessblocks' tracks blocks until we get confirmation - // that the datanode has deleted them; the only way we remove them - // is when we get a "removeBlock" message. - // - // The 'invalidate' list is used to inform the datanode the block - // should be deleted. Items are removed from the invalidate list - // upon giving instructions to the namenode. - // - addToInvalidates(b, cur.getDatanodeDescriptor()); - blockLog.info("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", cur, b); - } - } - - /** Check if we can use delHint */ - static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, - DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, - List<StorageType> excessTypes) { - if (!isFirst) { - return false; // only consider delHint for the first case - } else if (delHint == null) { - return false; // no delHint - } else if (!excessTypes.contains(delHint.getStorageType())) { - return false; // delHint storage type is not an excess type - } else { - // check if removing delHint reduces the number of racks - if (moreThan1Racks.contains(delHint)) { - return true; // delHint and some other nodes are under the same rack - } else if (added != null && !moreThan1Racks.contains(added)) { - return true; // the added node adds a new rack - } - return false; // removing delHint reduces the number of racks; + List<DatanodeStorageInfo> replicasToDelete = blockplacement + .chooseReplicasToDelete(nonExcess, replication, excessTypes, + addedNode, delNodeHint); + for (DatanodeStorageInfo choosenReplica : replicasToDelete) { + processChosenExcessReplica(nonExcess, choosenReplica, b); } } + private void processChosenExcessReplica( + final Collection<DatanodeStorageInfo> nonExcess, + final DatanodeStorageInfo chosen, Block b) { + nonExcess.remove(chosen); + addToExcessReplicate(chosen.getDatanodeDescriptor(), b); + // + // The 'excessblocks' tracks blocks until we get confirmation + // that the datanode has deleted them; the only way we remove them + // is when we get a "removeBlock" message. + // + // The 'invalidate' list is used to inform the datanode the block + // should be deleted. Items are removed from the invalidate list + // upon giving instructions to the datanodes. + // + addToInvalidates(b, chosen.getDatanodeDescriptor()); + blockLog.debug("BLOCK* chooseExcessReplicates: " + + "({}, {}) is added to invalidated blocks set", chosen, b); + } + private void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); @@ -3484,33 +3423,20 @@ public class BlockManager { return toInvalidate.size(); } - boolean blockHasEnoughRacks(Block b) { - boolean enoughRacks = false;; - Collection<DatanodeDescriptor> corruptNodes = - corruptReplicas.getNodes(b); - int numExpectedReplicas = getReplication(b); - String rackName = null; - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + boolean isPlacementPolicySatisfied(Block b) { + List<DatanodeDescriptor> liveNodes = new ArrayList<>(); + Collection<DatanodeDescriptor> corruptNodes = corruptReplicas + .getNodes(b); + for (DatanodeStorageInfo storage : blocksMap.getStorages(b)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); - if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { - if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { - if (numExpectedReplicas == 1 || - (numExpectedReplicas > 1 && - !datanodeManager.hasClusterEverBeenMultiRack())) { - enoughRacks = true; - break; - } - String rackNameNew = cur.getNetworkLocation(); - if (rackName == null) { - rackName = rackNameNew; - } else if (!rackName.equals(rackNameNew)) { - enoughRacks = true; - break; - } - } + if (!cur.isDecommissionInProgress() && !cur.isDecommissioned() + && ((corruptNodes == null) || !corruptNodes.contains(cur))) { + liveNodes.add(cur); } } - return enoughRacks; + DatanodeInfo[] locs = liveNodes.toArray(new DatanodeInfo[liveNodes.size()]); + return blockplacement.verifyBlockPlacement(locs, + getReplication(b)).isPlacementPolicySatisfied(); } /** @@ -3518,7 +3444,7 @@ public class BlockManager { * or if it does not have enough racks. */ boolean isNeededReplication(Block b, int expected, int current) { - return current < expected || !blockHasEnoughRacks(b); + return current < expected || !isPlacementPolicySatisfied(b); } public long getMissingBlocksCount() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 698fbc5..69a49aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -30,9 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.ReflectionUtils; @@ -103,37 +101,33 @@ public abstract class BlockPlacementPolicy { * Verify if the block's placement meets requirement of placement policy, * i.e. replicas are placed on no less than minRacks racks in the system. * - * @param srcPath the full pathname of the file to be verified - * @param lBlk block with locations + * @param locs block with locations * @param numOfReplicas replica number of file to be verified * @return the result of verification */ - abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath, - LocatedBlock lBlk, - int numOfReplicas); + abstract public BlockPlacementStatus verifyBlockPlacement( + DatanodeInfo[] locs, int numOfReplicas); + /** - * Decide whether deleting the specified replica of the block still makes - * the block conform to the configured block placement policy. - * - * @param srcBC block collection of file to which block-to-be-deleted belongs - * @param block The block to be deleted - * @param replicationFactor The required number of replicas for this block - * @param moreThanOne The replica locations of this block that are present - * on more than one unique racks. - * @param exactlyOne Replica locations of this block that are present - * on exactly one unique racks. - * @param excessTypes The excess {@link StorageType}s according to the - * {@link BlockStoragePolicy}. - * @return the replica that is the best candidate for deletion + * Select the excess replica storages for deletion based on either + * delNodehint/Excess storage types. + * + * @param candidates + * available replicas + * @param expectedNumOfReplicas + * The required number of replicas for this block + * @param excessTypes + * type of the storagepolicy + * @param addedNode + * New replica reported + * @param delNodeHint + * Hint for excess storage selection + * @return Returns the list of excess replicas chosen for deletion */ - abstract public DatanodeStorageInfo chooseReplicaToDelete( - BlockCollection srcBC, - Block block, - short replicationFactor, - Collection<DatanodeStorageInfo> moreThanOne, - Collection<DatanodeStorageInfo> exactlyOne, - List<StorageType> excessTypes); - + abstract public List<DatanodeStorageInfo> chooseReplicasToDelete( + Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas, + List<StorageType> excessTypes, DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint); /** * Used to setup a BlockPlacementPolicy object. This should be defined by * all implementations of a BlockPlacementPolicy. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 93056e9..ecbbcbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -27,10 +27,8 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -869,16 +867,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } @Override - public BlockPlacementStatus verifyBlockPlacement(String srcPath, - LocatedBlock lBlk, int numberOfReplicas) { - DatanodeInfo[] locs = lBlk.getLocations(); + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, + int numberOfReplicas) { if (locs == null) locs = DatanodeDescriptor.EMPTY_ARRAY; - int numRacks = clusterMap.getNumOfRacks(); - if(numRacks <= 1) // only one rack - return new BlockPlacementStatusDefault( - Math.min(numRacks, numberOfReplicas), numRacks); - int minRacks = Math.min(2, numberOfReplicas); + if (!clusterMap.hasClusterEverBeenMultiRack()) { + // only one rack + return new BlockPlacementStatusDefault(1, 1); + } + int minRacks = 2; + minRacks = Math.min(minRacks, numberOfReplicas); // 1. Check that all locations are different. // 2. Count locations on different racks. Set<String> racks = new TreeSet<String>(); @@ -886,12 +884,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { racks.add(dn.getNetworkLocation()); return new BlockPlacementStatusDefault(racks.size(), minRacks); } - - @Override - public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc, - Block block, short replicationFactor, - Collection<DatanodeStorageInfo> first, - Collection<DatanodeStorageInfo> second, + /** + * Decide whether deleting the specified replica of the block still makes + * the block conform to the configured block placement policy. + * @param replicationFactor The required number of replicas for this block + * @param moreThanone The replica locations of this block that are present + * on more than one unique racks. + * @param exactlyOne Replica locations of this block that are present + * on exactly one unique racks. + * @param excessTypes The excess {@link StorageType}s according to the + * {@link BlockStoragePolicy}. + * + * @return the replica that is the best candidate for deletion + */ + @VisibleForTesting + public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, + Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne, final List<StorageType> excessTypes) { long oldestHeartbeat = monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier; @@ -901,7 +909,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) { + for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) { if (!excessTypes.contains(storage.getStorageType())) { continue; } @@ -931,6 +939,76 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return storage; } + @Override + public List<DatanodeStorageInfo> chooseReplicasToDelete( + Collection<DatanodeStorageInfo> candidates, + int expectedNumOfReplicas, + List<StorageType> excessTypes, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint) { + + List<DatanodeStorageInfo> excessReplicas = new ArrayList<>(); + + final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>(); + + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); + + // split nodes into two sets + // moreThanOne contains nodes on rack with more than one replica + // exactlyOne contains the remaining nodes + splitNodesWithRack(candidates, rackMap, moreThanOne, exactlyOne); + + // pick one node to delete that favors the delete hint + // otherwise pick one with least space from priSet if it is not empty + // otherwise one node with least space from remains + boolean firstOne = true; + final DatanodeStorageInfo delNodeHintStorage = + DatanodeStorageInfo.getDatanodeStorageInfo(candidates, delNodeHint); + final DatanodeStorageInfo addedNodeStorage = + DatanodeStorageInfo.getDatanodeStorageInfo(candidates, addedNode); + + while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) { + final DatanodeStorageInfo cur; + if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, + moreThanOne, excessTypes)) { + cur = delNodeHintStorage; + } else { // regular excessive replica removal + cur = + chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne, + excessTypes); + } + firstOne = false; + + // adjust rackmap, moreThanOne, and exactlyOne + adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur); + excessReplicas.add(cur); + } + return excessReplicas; + } + + /** Check if we can use delHint. */ + @VisibleForTesting + static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, + DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, + List<StorageType> excessTypes) { + if (!isFirst) { + return false; // only consider delHint for the first case + } else if (delHint == null) { + return false; // no delHint + } else if (!excessTypes.contains(delHint.getStorageType())) { + return false; // delHint storage type is not an excess type + } else { + // check if removing delHint reduces the number of racks + if (moreThan1Racks.contains(delHint)) { + return true; // delHint and some other nodes are under the same rack + } else if (added != null && !moreThan1Racks.contains(added)) { + return true; // the added node adds a new rack + } + return false; // removing delHint reduces the number of racks; + } + } + /** * Pick up replica node set for deleting replica as over-replicated. * First set contains replica nodes on rack with more than one http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index c774d0b..9caca16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1184,14 +1184,6 @@ public class DatanodeManager { } /** - * @return true if this cluster has ever consisted of multiple racks, even if - * it is not now a multi-rack cluster. - */ - boolean hasClusterEverBeenMultiRack() { - return hasClusterEverBeenMultiRack; - } - - /** * Check if the cluster now consists of multiple racks. If it does, and this * is the first time it's consisted of multiple racks, then process blocks * that may now be misreplicated. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 5074e41..f66c84a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -542,7 +542,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { // count mis replicated blocks BlockPlacementStatus blockPlacementStatus = bpPolicy - .verifyBlockPlacement(path, lBlk, targetFileReplication); + .verifyBlockPlacement(lBlk.getLocations(), targetFileReplication); if (!blockPlacementStatus.isPlacementPolicySatisfied()) { res.numMisReplicatedBlocks++; misReplicatedPerFile++; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 5ee73d4..1d8eb16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -323,12 +323,12 @@ public class TestBalancer { conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true); long[] capacities = new long[] { CAPACITY, CAPACITY }; + String[] hosts = {"host0", "host1"}; String[] racks = { RACK0, RACK1 }; int numOfDatanodes = capacities.length; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) - .hosts(new String[]{"localhost", "localhost"}) - .racks(racks).simulatedCapacities(capacities).build(); + .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); try { cluster.waitActive(); @@ -340,7 +340,10 @@ public class TestBalancer { long totalUsedSpace = totalCapacity * 8 / 10; InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; for (int i = 0; i < favoredNodes.length; i++) { - favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress(); + // DFSClient will attempt reverse lookup. In case it resolves + // "127.0.0.1" to "localhost", we manually specify the hostname. + int port = cluster.getDataNodes().get(i).getXferAddress().getPort(); + favoredNodes[i] = new InetSocketAddress(hosts[i], port); } DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index e026a53..920a18d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -809,16 +809,16 @@ public class TestBlockManager { List<StorageType> excessTypes = new ArrayList<StorageType>(); excessTypes.add(StorageType.DEFAULT); - Assert.assertTrue(BlockManager.useDelHint(true, delHint, null, - moreThan1Racks, excessTypes)); + Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint, + null, moreThan1Racks, excessTypes)); excessTypes.remove(0); excessTypes.add(StorageType.SSD); - Assert.assertFalse(BlockManager.useDelHint(true, delHint, null, - moreThan1Racks, excessTypes)); + Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint, + null, moreThan1Racks, excessTypes)); } /** - * {@link BlockManager#blockHasEnoughRacks(BlockInfo)} should return false + * {@link BlockManager#isPlacementPolicySatisfied(Block)} should return false * if all the replicas are on the same rack and shouldn't be dependent on * CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY * @throws Exception @@ -837,7 +837,7 @@ public class TestBlockManager { BlockInfoContiguous blockInfo = addBlockOnNodes(1, rackA); // Since the network toppolgy is multi-rack, the blockHasEnoughRacks // should return false. - assertFalse("Replicas for block is not stored on enough racks", - bm.blockHasEnoughRacks(blockInfo)); + assertFalse("Replicas for block is not stored on enough racks", + bm.isPlacementPolicySatisfied(blockInfo)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 203e738..0ab6739 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -1040,12 +1042,12 @@ public class TestReplicationPolicy { { // test returning null excessTypes.add(StorageType.SSD); - assertNull(replicator.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes)); + assertNull(((BlockPlacementPolicyDefault) replicator) + .chooseReplicaToDelete((short) 3, first, second, excessTypes)); } excessTypes.add(StorageType.DEFAULT); - DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes); + DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) + .chooseReplicaToDelete((short) 3, first, second, excessTypes); // Within first set, storages[1] with less free space assertEquals(chosen, storages[1]); @@ -1054,11 +1056,76 @@ public class TestReplicationPolicy { assertEquals(3, second.size()); // Within second set, storages[5] with less free space excessTypes.add(StorageType.DEFAULT); - chosen = replicator.chooseReplicaToDelete( - null, null, (short)2, first, second, excessTypes); + chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( + (short)2, first, second, excessTypes); assertEquals(chosen, storages[5]); } + @Test + public void testChooseReplicasToDelete() throws Exception { + Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[2]); + nonExcess.add(storages[3]); + List<DatanodeStorageInfo> excessReplicas = new ArrayList<>(); + BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite + .createDefaultSuite(); + BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy(); + + // use delete hint case. + + DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor(); + List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); + assertTrue(excessReplicas.size() > 0); + assertTrue(excessReplicas.contains(storages[0])); + + // Excess type deletion + + DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo( + "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(), + "foo.com", StorageType.ARCHIVE); + nonExcess.add(excessStorage); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.contains(excessStorage)); + } + + @Test + public void testUseDelHint() throws Exception { + List<StorageType> excessTypes = new ArrayList<StorageType>(); + excessTypes.add(StorageType.ARCHIVE); + // only consider delHint for the first case + assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null, + null)); + // no delHint + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null, + null)); + // delHint storage type is not an excess type + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, + null, excessTypes)); + // check if removing delHint reduces the number of racks + List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); + chosenNodes.add(storages[0]); + chosenNodes.add(storages[2]); + excessTypes.add(StorageType.DEFAULT); + assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, + chosenNodes, excessTypes)); + // the added node adds a new rack + assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3], + storages[5], chosenNodes, excessTypes)); + // removing delHint reduces the number of racks; + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], + storages[0], chosenNodes, excessTypes)); + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null, + chosenNodes, excessTypes)); + } + /** * This testcase tests whether the default value returned by * DFSUtil.getInvalidateWorkPctPerIteration() is positive, http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 7708ddc..278f5a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -615,8 +615,8 @@ public class TestReplicationPolicyWithNodeGroup { assertEquals(1, second.size()); List<StorageType> excessTypes = new ArrayList<StorageType>(); excessTypes.add(StorageType.DEFAULT); - DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes); + DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) + .chooseReplicaToDelete((short) 3, first, second, excessTypes); // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, // dataNodes[0] and dataNodes[1] are in the same nodegroup, // but dataNodes[1] is chosen as less free space @@ -628,8 +628,8 @@ public class TestReplicationPolicyWithNodeGroup { // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen // as less free space excessTypes.add(StorageType.DEFAULT); - chosen = replicator.chooseReplicaToDelete( - null, null, (short)2, first, second, excessTypes); + chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( + (short) 2, first, second, excessTypes); assertEquals(chosen, storages[2]); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); @@ -637,8 +637,8 @@ public class TestReplicationPolicyWithNodeGroup { assertEquals(2, second.size()); // Within second set, dataNodes[5] with less free space excessTypes.add(StorageType.DEFAULT); - chosen = replicator.chooseReplicaToDelete( - null, null, (short)1, first, second, excessTypes); + chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( + (short) 1, first, second, excessTypes); assertEquals(chosen, storages[5]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ae8a834/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 42bf46f..3311588 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -40,10 +40,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; @@ -631,10 +629,8 @@ public class TestDNFencing { } @Override - public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode, - Block block, short replicationFactor, - Collection<DatanodeStorageInfo> first, - Collection<DatanodeStorageInfo> second, + public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, + Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second, List<StorageType> excessTypes) { Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
