Repository: hadoop Updated Branches: refs/heads/branch-2 e2f027ddd -> 39e7548cd
HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy. (Brahma Reddy Battula via mingma) (cherry picked from commit e27c2ae8bafc94f18eb38f5d839dcef5652d424e) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/39e7548c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/39e7548c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/39e7548c Branch: refs/heads/branch-2 Commit: 39e7548cd9d12e7be8c721a8bba56475cc8fcc23 Parents: e2f027d Author: Ming Ma <[email protected]> Authored: Wed Oct 21 08:06:58 2015 -0700 Committer: Ming Ma <[email protected]> Committed: Wed Oct 21 08:24:02 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/net/NetworkTopology.java | 34 +++- .../net/NetworkTopologyWithNodeGroup.java | 2 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 182 +++++-------------- .../blockmanagement/BlockPlacementPolicy.java | 50 +++-- .../BlockPlacementPolicyDefault.java | 112 ++++++++++-- .../BlockPlacementPolicyRackFaultTolerant.java | 18 ++ .../BlockPlacementPolicyWithUpgradeDomain.java | 11 +- .../server/blockmanagement/DatanodeManager.java | 8 - .../hdfs/server/namenode/NamenodeFsck.java | 2 +- .../hdfs/server/balancer/TestBalancer.java | 9 +- .../blockmanagement/TestBlockManager.java | 8 +- .../blockmanagement/TestReplicationPolicy.java | 79 +++++++- .../TestReplicationPolicyWithNodeGroup.java | 12 +- .../TestReplicationPolicyWithUpgradeDomain.java | 25 ++- .../hdfs/server/namenode/ha/TestDNFencing.java | 8 +- 16 files changed, 328 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index fe6e439..b637da1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -54,9 +54,9 @@ import com.google.common.collect.Lists; public class NetworkTopology { public final static String DEFAULT_RACK = "/default-rack"; public final static int DEFAULT_HOST_LEVEL = 2; - public static final Log LOG = + public static final Log LOG = LogFactory.getLog(NetworkTopology.class); - + public static class InvalidTopologyException extends RuntimeException { private static final long serialVersionUID = 1L; public InvalidTopologyException(String msg) { @@ -379,6 +379,13 @@ public class NetworkTopology { private int depthOfAllLeaves = -1; /** rack counter */ protected int numOfRacks = 0; + + /** + * Whether or not this cluster has ever consisted of more than 1 rack, + * according to the NetworkTopology. + */ + private boolean clusterEverBeenMultiRack = false; + /** the lock used to manage access */ protected ReadWriteLock netlock = new ReentrantReadWriteLock(); @@ -417,7 +424,7 @@ public class NetworkTopology { if (clusterMap.add(node)) { LOG.info("Adding a new node: "+NodeBase.getPath(node)); if (rack == null) { - numOfRacks++; + incrementRacks(); } if (!(node instanceof InnerNode)) { if (depthOfAllLeaves == -1) { @@ -432,7 +439,14 @@ public class NetworkTopology { netlock.writeLock().unlock(); } } - + + protected void incrementRacks() { + numOfRacks++; + if (!clusterEverBeenMultiRack && numOfRacks > 1) { + clusterEverBeenMultiRack = true; + } + } + /** * Return a reference to the node given its string representation. * Default implementation delegates to {@link #getNode(String)}. @@ -540,10 +554,18 @@ public class NetworkTopology { netlock.readLock().unlock(); } } - + + /** + * @return true if this cluster has ever consisted of multiple racks, even if + * it is not now a multi-rack cluster. + */ + public boolean hasClusterEverBeenMultiRack() { + return clusterEverBeenMultiRack; + } + /** Given a string representation of a rack for a specific network * location - * + * * To be overridden in subclasses for specific NetworkTopology * implementations, as alternative to overriding the full * {@link #getRack(String)} method. http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index 3de49dc..72031aa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -205,7 +205,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology { LOG.info("Adding a new node: " + NodeBase.getPath(node)); if (rack == null) { // We only track rack number here - numOfRacks++; + incrementRacks(); } } if(LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6c2709c..f17f460 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -712,6 +712,9 @@ Release 2.8.0 - UNRELEASED HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid explicitly creating Files in the tests code. (lei) + HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy. + (Brahma Reddy Battula via mingma) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index de13ff3..b58c759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -239,9 +239,6 @@ public class BlockManager implements BlockStatsMXBean { final float blocksInvalidateWorkPct; final int blocksReplWorkMultiplier; - /** variable to enable check for enough racks */ - final boolean shouldCheckForEnoughRacks; - // whether or not to issue block encryption keys. final boolean encryptDataTransfer; @@ -339,10 +336,6 @@ public class BlockManager implements BlockStatsMXBean { conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); - this.shouldCheckForEnoughRacks = - conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null - ? false : true; - this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); @@ -366,7 +359,6 @@ public class BlockManager implements BlockStatsMXBean { LOG.info("maxReplication = " + maxReplication); LOG.info("minReplication = " + minReplication); LOG.info("maxReplicationStreams = " + maxReplicationStreams); - LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); LOG.info("replicationRecheckInterval = " + replicationRecheckInterval); LOG.info("encryptDataTransfer = " + encryptDataTransfer); LOG.info("maxNumBlocksToLog = " + maxNumBlocksToLog); @@ -1432,7 +1424,7 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas numReplicas, int pendingReplicaNum, int required) { int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum; return (numEffectiveReplicas >= required) && - (pendingReplicaNum > 0 || blockHasEnoughRacks(block)); + (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); } private ReplicationWork scheduleReplication(BlockInfo block, int priority) { @@ -1512,7 +1504,7 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo[] targets = rw.getTargets(); if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { + (!isPlacementPolicySatisfied(block)) ) { if (rw.getSrcNode().getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case @@ -2897,7 +2889,7 @@ public class BlockManager implements BlockStatsMXBean { * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ - private void processOverReplicatedBlock(final Block block, + private void processOverReplicatedBlock(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); @@ -2930,110 +2922,48 @@ public class BlockManager implements BlockStatsMXBean { } } chooseExcessReplicates(nonExcess, block, replication, - addedNode, delNodeHint, blockplacement); + addedNode, delNodeHint); } - - /** - * We want "replication" replicates for the block, but we now have too many. - * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that: - * - * srcNodes.size() - dstNodes.size() == replication - * - * We pick node that make sure that replicas are spread across racks and - * also try hard to pick one with least free space. - * The algorithm is first to pick a node with least free space from nodes - * that are on a rack holding more than one replicas of the block. - * So removing such a replica won't remove a rack. - * If no such a node is available, - * then pick a node with least free space - */ - private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, - Block b, short replication, - DatanodeDescriptor addedNode, - DatanodeDescriptor delNodeHint, - BlockPlacementPolicy replicator) { + private void chooseExcessReplicates( + final Collection<DatanodeStorageInfo> nonExcess, + BlockInfo storedBlock, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and - BlockInfo bi = getStoredBlock(b); - BlockCollection bc = getBlockCollection(bi); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID()); + BlockCollection bc = getBlockCollection(storedBlock); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); final List<StorageType> excessTypes = storagePolicy.chooseExcess( replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); - - - final Map<String, List<DatanodeStorageInfo>> rackMap - = new HashMap<String, List<DatanodeStorageInfo>>(); - final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); - final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); - - // split nodes into two sets - // moreThanOne contains nodes on rack with more than one replica - // exactlyOne contains the remaining nodes - replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne); - - // pick one node to delete that favors the delete hint - // otherwise pick one with least space from priSet if it is not empty - // otherwise one node with least space from remains - boolean firstOne = true; - final DatanodeStorageInfo delNodeHintStorage - = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint); - final DatanodeStorageInfo addedNodeStorage - = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode); - while (nonExcess.size() - replication > 0) { - final DatanodeStorageInfo cur; - if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, - moreThanOne, excessTypes)) { - cur = delNodeHintStorage; - } else { // regular excessive replica removal - cur = replicator.chooseReplicaToDelete(bc, b, replication, - moreThanOne, exactlyOne, excessTypes); - } - firstOne = false; - - // adjust rackmap, moreThanOne, and exactlyOne - replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, - exactlyOne, cur); - - nonExcess.remove(cur); - addToExcessReplicate(cur.getDatanodeDescriptor(), b); - - // - // The 'excessblocks' tracks blocks until we get confirmation - // that the datanode has deleted them; the only way we remove them - // is when we get a "removeBlock" message. - // - // The 'invalidate' list is used to inform the datanode the block - // should be deleted. Items are removed from the invalidate list - // upon giving instructions to the namenode. - // - addToInvalidates(b, cur.getDatanodeDescriptor()); - blockLog.debug("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", cur, b); - } - } - - /** Check if we can use delHint */ - static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, - DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, - List<StorageType> excessTypes) { - if (!isFirst) { - return false; // only consider delHint for the first case - } else if (delHint == null) { - return false; // no delHint - } else if (!excessTypes.contains(delHint.getStorageType())) { - return false; // delHint storage type is not an excess type - } else { - // check if removing delHint reduces the number of racks - if (moreThan1Racks.contains(delHint)) { - return true; // delHint and some other nodes are under the same rack - } else if (added != null && !moreThan1Racks.contains(added)) { - return true; // the added node adds a new rack - } - return false; // removing delHint reduces the number of racks; + List<DatanodeStorageInfo> replicasToDelete = blockplacement + .chooseReplicasToDelete(nonExcess, replication, excessTypes, + addedNode, delNodeHint); + for (DatanodeStorageInfo choosenReplica : replicasToDelete) { + processChosenExcessReplica(nonExcess, choosenReplica, storedBlock); } } + private void processChosenExcessReplica( + final Collection<DatanodeStorageInfo> nonExcess, + final DatanodeStorageInfo chosen, BlockInfo storedBlock) { + nonExcess.remove(chosen); + addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock); + // + // The 'excessblocks' tracks blocks until we get confirmation + // that the datanode has deleted them; the only way we remove them + // is when we get a "removeBlock" message. + // + // The 'invalidate' list is used to inform the datanode the block + // should be deleted. Items are removed from the invalidate list + // upon giving instructions to the datanodes. + // + addToInvalidates(storedBlock, chosen.getDatanodeDescriptor()); + blockLog.debug("BLOCK* chooseExcessReplicates: " + + "({}, {}) is added to invalidated blocks set", chosen, storedBlock); + } + private void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); LightWeightHashSet<Block> excessBlocks = excessReplicateMap.get( @@ -3560,36 +3490,20 @@ public class BlockManager implements BlockStatsMXBean { return toInvalidate.size(); } - boolean blockHasEnoughRacks(BlockInfo b) { - if (!this.shouldCheckForEnoughRacks) { - return true; - } - boolean enoughRacks = false;; - Collection<DatanodeDescriptor> corruptNodes = - corruptReplicas.getNodes(b); - int numExpectedReplicas = getReplication(b); - String rackName = null; - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + boolean isPlacementPolicySatisfied(BlockInfo storedBlock) { + List<DatanodeDescriptor> liveNodes = new ArrayList<>(); + Collection<DatanodeDescriptor> corruptNodes = corruptReplicas + .getNodes(storedBlock); + for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); - if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { - if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { - if (numExpectedReplicas == 1 || - (numExpectedReplicas > 1 && - !datanodeManager.hasClusterEverBeenMultiRack())) { - enoughRacks = true; - break; - } - String rackNameNew = cur.getNetworkLocation(); - if (rackName == null) { - rackName = rackNameNew; - } else if (!rackName.equals(rackNameNew)) { - enoughRacks = true; - break; - } - } + if (!cur.isDecommissionInProgress() && !cur.isDecommissioned() + && ((corruptNodes == null) || !corruptNodes.contains(cur))) { + liveNodes.add(cur); } } - return enoughRacks; + DatanodeInfo[] locs = liveNodes.toArray(new DatanodeInfo[liveNodes.size()]); + return blockplacement.verifyBlockPlacement(locs, + storedBlock.getReplication()).isPlacementPolicySatisfied(); } /** @@ -3598,7 +3512,7 @@ public class BlockManager implements BlockStatsMXBean { */ boolean isNeededReplication(BlockInfo storedBlock, int current) { int expected = storedBlock.getReplication(); - return current < expected || !blockHasEnoughRacks(storedBlock); + return current < expected || !isPlacementPolicySatisfied(storedBlock); } public short getExpectedReplicaNum(BlockInfo block) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 698fbc5..69a49aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -30,9 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.ReflectionUtils; @@ -103,37 +101,33 @@ public abstract class BlockPlacementPolicy { * Verify if the block's placement meets requirement of placement policy, * i.e. replicas are placed on no less than minRacks racks in the system. * - * @param srcPath the full pathname of the file to be verified - * @param lBlk block with locations + * @param locs block with locations * @param numOfReplicas replica number of file to be verified * @return the result of verification */ - abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath, - LocatedBlock lBlk, - int numOfReplicas); + abstract public BlockPlacementStatus verifyBlockPlacement( + DatanodeInfo[] locs, int numOfReplicas); + /** - * Decide whether deleting the specified replica of the block still makes - * the block conform to the configured block placement policy. - * - * @param srcBC block collection of file to which block-to-be-deleted belongs - * @param block The block to be deleted - * @param replicationFactor The required number of replicas for this block - * @param moreThanOne The replica locations of this block that are present - * on more than one unique racks. - * @param exactlyOne Replica locations of this block that are present - * on exactly one unique racks. - * @param excessTypes The excess {@link StorageType}s according to the - * {@link BlockStoragePolicy}. - * @return the replica that is the best candidate for deletion + * Select the excess replica storages for deletion based on either + * delNodehint/Excess storage types. + * + * @param candidates + * available replicas + * @param expectedNumOfReplicas + * The required number of replicas for this block + * @param excessTypes + * type of the storagepolicy + * @param addedNode + * New replica reported + * @param delNodeHint + * Hint for excess storage selection + * @return Returns the list of excess replicas chosen for deletion */ - abstract public DatanodeStorageInfo chooseReplicaToDelete( - BlockCollection srcBC, - Block block, - short replicationFactor, - Collection<DatanodeStorageInfo> moreThanOne, - Collection<DatanodeStorageInfo> exactlyOne, - List<StorageType> excessTypes); - + abstract public List<DatanodeStorageInfo> chooseReplicasToDelete( + Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas, + List<StorageType> excessTypes, DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint); /** * Used to setup a BlockPlacementPolicy object. This should be defined by * all implementations of a BlockPlacementPolicy. http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index ad399d6..ad1a739 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -26,9 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; @@ -859,16 +857,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } @Override - public BlockPlacementStatus verifyBlockPlacement(String srcPath, - LocatedBlock lBlk, int numberOfReplicas) { - DatanodeInfo[] locs = lBlk.getLocations(); + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, + int numberOfReplicas) { if (locs == null) locs = DatanodeDescriptor.EMPTY_ARRAY; - int numRacks = clusterMap.getNumOfRacks(); - if(numRacks <= 1) // only one rack - return new BlockPlacementStatusDefault( - Math.min(numRacks, numberOfReplicas), numRacks); - int minRacks = Math.min(2, numberOfReplicas); + if (!clusterMap.hasClusterEverBeenMultiRack()) { + // only one rack + return new BlockPlacementStatusDefault(1, 1); + } + int minRacks = 2; + minRacks = Math.min(minRacks, numberOfReplicas); // 1. Check that all locations are different. // 2. Count locations on different racks. Set<String> racks = new TreeSet<String>(); @@ -876,12 +874,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { racks.add(dn.getNetworkLocation()); return new BlockPlacementStatusDefault(racks.size(), minRacks); } - - @Override - public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc, - Block block, short replicationFactor, - Collection<DatanodeStorageInfo> first, - Collection<DatanodeStorageInfo> second, + /** + * Decide whether deleting the specified replica of the block still makes + * the block conform to the configured block placement policy. + * @param replicationFactor The required number of replicas for this block + * @param moreThanone The replica locations of this block that are present + * on more than one unique racks. + * @param exactlyOne Replica locations of this block that are present + * on exactly one unique racks. + * @param excessTypes The excess {@link StorageType}s according to the + * {@link BlockStoragePolicy}. + * + * @return the replica that is the best candidate for deletion + */ + @VisibleForTesting + public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, + Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne, final List<StorageType> excessTypes) { long oldestHeartbeat = monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier; @@ -891,7 +899,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) { + for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) { if (!excessTypes.contains(storage.getStorageType())) { continue; } @@ -921,6 +929,76 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return storage; } + @Override + public List<DatanodeStorageInfo> chooseReplicasToDelete( + Collection<DatanodeStorageInfo> candidates, + int expectedNumOfReplicas, + List<StorageType> excessTypes, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint) { + + List<DatanodeStorageInfo> excessReplicas = new ArrayList<>(); + + final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>(); + + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); + + // split nodes into two sets + // moreThanOne contains nodes on rack with more than one replica + // exactlyOne contains the remaining nodes + splitNodesWithRack(candidates, rackMap, moreThanOne, exactlyOne); + + // pick one node to delete that favors the delete hint + // otherwise pick one with least space from priSet if it is not empty + // otherwise one node with least space from remains + boolean firstOne = true; + final DatanodeStorageInfo delNodeHintStorage = + DatanodeStorageInfo.getDatanodeStorageInfo(candidates, delNodeHint); + final DatanodeStorageInfo addedNodeStorage = + DatanodeStorageInfo.getDatanodeStorageInfo(candidates, addedNode); + + while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) { + final DatanodeStorageInfo cur; + if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, + moreThanOne, excessTypes)) { + cur = delNodeHintStorage; + } else { // regular excessive replica removal + cur = + chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne, + excessTypes); + } + firstOne = false; + + // adjust rackmap, moreThanOne, and exactlyOne + adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur); + excessReplicas.add(cur); + } + return excessReplicas; + } + + /** Check if we can use delHint. */ + @VisibleForTesting + static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, + DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, + List<StorageType> excessTypes) { + if (!isFirst) { + return false; // only consider delHint for the first case + } else if (delHint == null) { + return false; // no delHint + } else if (!excessTypes.contains(delHint.getStorageType())) { + return false; // delHint storage type is not an excess type + } else { + // check if removing delHint reduces the number of racks + if (moreThan1Racks.contains(delHint)) { + return true; // delHint and some other nodes are under the same rack + } else if (added != null && !moreThan1Racks.contains(added)) { + return true; // the added node adds a new rack + } + return false; // removing delHint reduces the number of racks; + } + } + /** * Pick up replica node set for deleting replica as over-replicated. * First set contains replica nodes on rack with more than one http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index f25fb15..8ca0d2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; @@ -151,4 +152,21 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD maxNodesPerRack, results, avoidStaleNodes, storageTypes); return writer; } + + @Override + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, + int numberOfReplicas) { + if (locs == null) + locs = DatanodeDescriptor.EMPTY_ARRAY; + if (!clusterMap.hasClusterEverBeenMultiRack()) { + // only one rack + return new BlockPlacementStatusDefault(1, 1); + } + // 1. Check that all locations are different. + // 2. Count locations on different racks. + Set<String> racks = new TreeSet<String>(); + for (DatanodeInfo dn : locs) + racks.add(dn.getNetworkLocation()); + return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java index 71c02b8..3241908 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java @@ -32,7 +32,6 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.net.NetworkTopology; /** @@ -135,13 +134,13 @@ public class BlockPlacementPolicyWithUpgradeDomain extends } @Override - public BlockPlacementStatus verifyBlockPlacement(String srcPath, - LocatedBlock lBlk, int numberOfReplicas) { - BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(srcPath, - lBlk, numberOfReplicas); + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, + int numberOfReplicas) { + BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs, + numberOfReplicas); BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus = new BlockPlacementStatusWithUpgradeDomain(defaultStatus, - getUpgradeDomainsFromNodes(lBlk.getLocations()), + getUpgradeDomainsFromNodes(locs), numberOfReplicas, upgradeDomainFactor); return upgradeDomainStatus; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 5bf09de..77bb0c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1152,14 +1152,6 @@ public class DatanodeManager { } /** - * @return true if this cluster has ever consisted of multiple racks, even if - * it is not now a multi-rack cluster. - */ - boolean hasClusterEverBeenMultiRack() { - return hasClusterEverBeenMultiRack; - } - - /** * Check if the cluster now consists of multiple racks. If it does, and this * is the first time it's consisted of multiple racks, then process blocks * that may now be misreplicated. http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 1d14daa..42c2144 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -611,7 +611,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { // count mis replicated blocks BlockPlacementStatus blockPlacementStatus = bpPolicy - .verifyBlockPlacement(path, lBlk, targetFileReplication); + .verifyBlockPlacement(lBlk.getLocations(), targetFileReplication); if (!blockPlacementStatus.isPlacementPolicySatisfied()) { res.numMisReplicatedBlocks++; misReplicatedPerFile++; http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 42bbce3..095241d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -341,12 +341,12 @@ public class TestBalancer { conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true); long[] capacities = new long[] { CAPACITY, CAPACITY }; + String[] hosts = {"host0", "host1"}; String[] racks = { RACK0, RACK1 }; int numOfDatanodes = capacities.length; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) - .hosts(new String[]{"localhost", "localhost"}) - .racks(racks).simulatedCapacities(capacities).build(); + .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); try { cluster.waitActive(); @@ -358,7 +358,10 @@ public class TestBalancer { long totalUsedSpace = totalCapacity * 8 / 10; InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; for (int i = 0; i < favoredNodes.length; i++) { - favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress(); + // DFSClient will attempt reverse lookup. In case it resolves + // "127.0.0.1" to "localhost", we manually specify the hostname. + int port = cluster.getDataNodes().get(i).getXferAddress().getPort(); + favoredNodes[i] = new InetSocketAddress(hosts[i], port); } DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 4a07e74..9de62c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -823,11 +823,11 @@ public class TestBlockManager { List<StorageType> excessTypes = new ArrayList<StorageType>(); excessTypes.add(StorageType.DEFAULT); - Assert.assertTrue(BlockManager.useDelHint(true, delHint, null, - moreThan1Racks, excessTypes)); + Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint, + null, moreThan1Racks, excessTypes)); excessTypes.remove(0); excessTypes.add(StorageType.SSD); - Assert.assertFalse(BlockManager.useDelHint(true, delHint, null, - moreThan1Racks, excessTypes)); + Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint, + null, moreThan1Racks, excessTypes)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 23d9017..e321864 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.LogVerificationAppender; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; @@ -968,12 +970,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { { // test returning null excessTypes.add(StorageType.SSD); - assertNull(replicator.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes)); + assertNull(((BlockPlacementPolicyDefault) replicator) + .chooseReplicaToDelete((short) 3, first, second, excessTypes)); } excessTypes.add(StorageType.DEFAULT); - DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes); + DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) + .chooseReplicaToDelete((short) 3, first, second, excessTypes); // Within first set, storages[1] with less free space assertEquals(chosen, storages[1]); @@ -982,11 +984,76 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { assertEquals(3, second.size()); // Within second set, storages[5] with less free space excessTypes.add(StorageType.DEFAULT); - chosen = replicator.chooseReplicaToDelete( - null, null, (short)2, first, second, excessTypes); + chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( + (short)2, first, second, excessTypes); assertEquals(chosen, storages[5]); } + @Test + public void testChooseReplicasToDelete() throws Exception { + Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[2]); + nonExcess.add(storages[3]); + List<DatanodeStorageInfo> excessReplicas = new ArrayList<>(); + BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite + .createDefaultSuite(); + BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy(); + + // use delete hint case. + + DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor(); + List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); + assertTrue(excessReplicas.size() > 0); + assertTrue(excessReplicas.contains(storages[0])); + + // Excess type deletion + + DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo( + "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(), + "foo.com", StorageType.ARCHIVE); + nonExcess.add(excessStorage); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.contains(excessStorage)); + } + + @Test + public void testUseDelHint() throws Exception { + List<StorageType> excessTypes = new ArrayList<StorageType>(); + excessTypes.add(StorageType.ARCHIVE); + // only consider delHint for the first case + assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null, + null)); + // no delHint + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null, + null)); + // delHint storage type is not an excess type + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, + null, excessTypes)); + // check if removing delHint reduces the number of racks + List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); + chosenNodes.add(storages[0]); + chosenNodes.add(storages[2]); + excessTypes.add(StorageType.DEFAULT); + assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, + chosenNodes, excessTypes)); + // the added node adds a new rack + assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3], + storages[5], chosenNodes, excessTypes)); + // removing delHint reduces the number of racks; + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], + storages[0], chosenNodes, excessTypes)); + assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null, + chosenNodes, excessTypes)); + } + /** * This testcase tests whether the default value returned by * DFSUtil.getInvalidateWorkPctPerIteration() is positive, http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 86f10a8..528021d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -533,8 +533,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes assertEquals(1, second.size()); List<StorageType> excessTypes = new ArrayList<>(); excessTypes.add(StorageType.DEFAULT); - DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes); + DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) + .chooseReplicaToDelete((short) 3, first, second, excessTypes); // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, // dataNodes[0] and dataNodes[1] are in the same nodegroup, // but dataNodes[1] is chosen as less free space @@ -546,8 +546,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen // as less free space excessTypes.add(StorageType.DEFAULT); - chosen = replicator.chooseReplicaToDelete( - null, null, (short)2, first, second, excessTypes); + chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( + (short) 2, first, second, excessTypes); assertEquals(chosen, storages[2]); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); @@ -555,8 +555,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes assertEquals(2, second.size()); // Within second set, dataNodes[5] with less free space excessTypes.add(StorageType.DEFAULT); - chosen = replicator.chooseReplicaToDelete( - null, null, (short)1, first, second, excessTypes); + chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( + (short) 1, first, second, excessTypes); assertEquals(chosen, storages[5]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index feb2b79..b5caebf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -208,7 +208,7 @@ public class TestReplicationPolicyWithUpgradeDomain second.add(storages[8]); DatanodeStorageInfo chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes); + (short)3, first, second, excessTypes); assertEquals(chosenStorage, storages[1]); first.clear(); second.clear(); @@ -219,7 +219,7 @@ public class TestReplicationPolicyWithUpgradeDomain first.add(storages[4]); first.add(storages[5]); chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete( - null, null, (short)3, first, second, excessTypes); + (short)3, first, second, excessTypes); assertTrue(chosenStorage.equals(storages[1]) || chosenStorage.equals(storages[4])); } @@ -265,7 +265,8 @@ public class TestReplicationPolicyWithUpgradeDomain set.add(storages[4]); locatedBlock = BlockManager.newLocatedBlock(b, set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); - status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); assertFalse(status.isPlacementPolicySatisfied()); // 3 upgrade domains (enough), 2 racks (enough) @@ -275,7 +276,8 @@ public class TestReplicationPolicyWithUpgradeDomain set.add(storages[5]); locatedBlock = BlockManager.newLocatedBlock(b, set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); - status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); assertTrue(status.isPlacementPolicySatisfied()); // 3 upgrade domains (enough), 1 rack (not enough) @@ -285,7 +287,8 @@ public class TestReplicationPolicyWithUpgradeDomain set.add(storages[2]); locatedBlock = BlockManager.newLocatedBlock(b, set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); - status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); assertFalse(status.isPlacementPolicySatisfied()); assertFalse(status.getErrorDescription().contains("upgrade domain")); @@ -296,7 +299,8 @@ public class TestReplicationPolicyWithUpgradeDomain set.add(storages[8]); locatedBlock = BlockManager.newLocatedBlock(b, set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); - status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); assertFalse(status.isPlacementPolicySatisfied()); assertTrue(status.getErrorDescription().contains("upgrade domain")); @@ -307,7 +311,8 @@ public class TestReplicationPolicyWithUpgradeDomain set.add(storages[8]); locatedBlock = BlockManager.newLocatedBlock(b, set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); - status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); assertTrue(status.isPlacementPolicySatisfied()); @@ -319,7 +324,8 @@ public class TestReplicationPolicyWithUpgradeDomain set.add(storages[8]); locatedBlock = BlockManager.newLocatedBlock(b, set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); - status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); assertTrue(status.isPlacementPolicySatisfied()); // 2 upgrade domains (not enough), 3 racks (enough), 4 replicas @@ -330,7 +336,8 @@ public class TestReplicationPolicyWithUpgradeDomain set.add(storages[8]); locatedBlock = BlockManager.newLocatedBlock(b, set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); - status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); assertFalse(status.isPlacementPolicySatisfied()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/39e7548c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 04b7b94..143665a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -40,10 +40,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; @@ -631,10 +629,8 @@ public class TestDNFencing { } @Override - public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode, - Block block, short replicationFactor, - Collection<DatanodeStorageInfo> first, - Collection<DatanodeStorageInfo> second, + public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, + Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second, List<StorageType> excessTypes) { Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
