Repository: hadoop Updated Branches: refs/heads/HDFS-8707 457fe0870 -> fbba87017
HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ec414600 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ec414600 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ec414600 Branch: refs/heads/HDFS-8707 Commit: ec414600ede8e305c584818565b50e055ea5d2b5 Parents: 88beb46 Author: Lei Xu <l...@apache.org> Authored: Tue Nov 3 14:17:11 2015 -0800 Committer: Lei Xu <l...@apache.org> Committed: Wed Nov 4 10:22:17 2015 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/server/balancer/Dispatcher.java | 65 ++----- .../blockmanagement/BlockPlacementPolicy.java | 53 ++++-- .../BlockPlacementPolicyDefault.java | 57 ++++--- .../BlockPlacementPolicyWithNodeGroup.java | 35 ++-- .../BlockPlacementPolicyWithUpgradeDomain.java | 84 +++++++-- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 9 +- .../hdfs/server/balancer/TestBalancer.java | 103 ++++++++++- .../blockmanagement/TestBlockManager.java | 13 +- .../blockmanagement/TestReplicationPolicy.java | 93 +++++++--- .../TestReplicationPolicyWithNodeGroup.java | 6 +- .../TestReplicationPolicyWithUpgradeDomain.java | 171 +++++++++++++++---- .../hdfs/server/namenode/ha/TestDNFencing.java | 10 +- 13 files changed, 503 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f2d8296..fd560d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1618,6 +1618,8 @@ Release 2.8.0 - UNRELEASED HDFS-9339. Extend full test of KMS ACLs. (Daniel Templeton via zhz) + HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 5b3eb36..9f9cdc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; @@ -124,6 +125,7 @@ public class Dispatcher { private final int ioFileBufferSize; private final boolean connectToDnViaHostname; + private BlockPlacementPolicies placementPolicies; static class Allocator { private final int max; @@ -949,6 +951,7 @@ public class Dispatcher { this.connectToDnViaHostname = conf.getBoolean( HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null); } public DistributedFileSystem getDistributedFileSystem() { @@ -1166,66 +1169,24 @@ public class Dispatcher { } } - if (cluster.isNodeGroupAware() - && isOnSameNodeGroupWithReplicas(source, target, block)) { - return false; - } - if (reduceNumOfRacks(source, target, block)) { + if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) { return false; } return true; } - /** - * Determine whether moving the given block replica from source to target - * would reduce the number of racks of the block replicas. - */ - private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target, - DBlock block) { - final DatanodeInfo sourceDn = source.getDatanodeInfo(); - if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) { - // source and target are on the same rack - return false; - } - boolean notOnSameRack = true; + // Check if the move will violate the block placement policy. + private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source, + StorageGroup target, DBlock block) { + List<DatanodeInfo> datanodeInfos = new ArrayList<>(); synchronized (block) { - for (StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) { - notOnSameRack = false; - break; - } - } - } - if (notOnSameRack) { - // target is not on the same rack as any replica - return false; - } - for (StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) { - // source is on the same rack of another replica - return false; + for (StorageGroup loc : block.locations) { + datanodeInfos.add(loc.getDatanodeInfo()); } + datanodeInfos.add(target.getDatanodeInfo()); } - return true; - } - - /** - * Check if there are any replica (other than source) on the same node group - * with target. If true, then target is not a good candidate for placing - * specific replica as we don't want 2 replicas under the same nodegroup. - * - * @return true if there are any replica (other than source) on the same node - * group with target - */ - private boolean isOnSameNodeGroupWithReplicas(StorageGroup source, - StorageGroup target, DBlock block) { - final DatanodeInfo targetDn = target.getDatanodeInfo(); - for (StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) { - return true; - } - } - return false; + return placementPolicies.getPolicy(false).isMovable( + datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo()); } /** Reset all fields in order to prepare for the next iteration */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 526a5d7..8478387 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -141,6 +142,17 @@ public abstract class BlockPlacementPolicy { Host2NodesMap host2datanodeMap); /** + * Check if the move is allowed. Used by balancer and other tools. + * @ + * + * @param candidates all replicas including source and target + * @param source source replica of the move + * @param target target replica of the move + */ + abstract public boolean isMovable(Collection<DatanodeInfo> candidates, + DatanodeInfo source, DatanodeInfo target); + + /** * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur. * * @param rackMap a map from rack to replica @@ -172,6 +184,20 @@ public abstract class BlockPlacementPolicy { } } + protected <T> DatanodeInfo getDatanodeInfo(T datanode) { + Preconditions.checkArgument( + datanode instanceof DatanodeInfo || + datanode instanceof DatanodeStorageInfo, + "class " + datanode.getClass().getName() + " not allowed"); + if (datanode instanceof DatanodeInfo) { + return ((DatanodeInfo)datanode); + } else if (datanode instanceof DatanodeStorageInfo) { + return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor(); + } else { + return null; + } + } + /** * Get rack string from a data node * @return rack of data node @@ -179,33 +205,33 @@ public abstract class BlockPlacementPolicy { protected String getRack(final DatanodeInfo datanode) { return datanode.getNetworkLocation(); } - + /** * Split data nodes into two sets, one set includes nodes on rack with * more than one replica, the other set contains the remaining nodes. * - * @param dataNodes datanodes to be split into two sets + * @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split + * into two sets * @param rackMap a map from rack to datanodes * @param moreThanOne contains nodes on rack with more than one replica * @param exactlyOne remains contains the remaining nodes */ - public void splitNodesWithRack( - final Iterable<DatanodeStorageInfo> storages, - final Map<String, List<DatanodeStorageInfo>> rackMap, - final List<DatanodeStorageInfo> moreThanOne, - final List<DatanodeStorageInfo> exactlyOne) { - for(DatanodeStorageInfo s: storages) { - final String rackName = getRack(s.getDatanodeDescriptor()); - List<DatanodeStorageInfo> storageList = rackMap.get(rackName); + public <T> void splitNodesWithRack( + final Iterable<T> storagesOrDataNodes, + final Map<String, List<T>> rackMap, + final List<T> moreThanOne, + final List<T> exactlyOne) { + for(T s: storagesOrDataNodes) { + final String rackName = getRack(getDatanodeInfo(s)); + List<T> storageList = rackMap.get(rackName); if (storageList == null) { - storageList = new ArrayList<DatanodeStorageInfo>(); + storageList = new ArrayList<T>(); rackMap.put(rackName, storageList); } storageList.add(s); } - // split nodes into two sets - for(List<DatanodeStorageInfo> storageList : rackMap.values()) { + for(List<T> storageList : rackMap.values()) { if (storageList.size() == 1) { // exactlyOne contains nodes on rack with only one replica exactlyOne.add(storageList.get(0)); @@ -215,5 +241,4 @@ public abstract class BlockPlacementPolicy { } } } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 2723ed9..56ebc35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -881,7 +881,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { minRacks = Math.min(minRacks, numberOfReplicas); // 1. Check that all locations are different. // 2. Count locations on different racks. - Set<String> racks = new TreeSet<String>(); + Set<String> racks = new TreeSet<>(); for (DatanodeInfo dn : locs) racks.add(dn.getNetworkLocation()); return new BlockPlacementStatusDefault(racks.size(), minRacks); @@ -889,8 +889,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { /** * Decide whether deleting the specified replica of the block still makes * the block conform to the configured block placement policy. - * @param replicationFactor The required number of replicas for this block - * @param moreThanone The replica locations of this block that are present + * @param moreThanOne The replica locations of this block that are present * on more than one unique racks. * @param exactlyOne Replica locations of this block that are present * on exactly one unique racks. @@ -900,8 +899,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return the replica that is the best candidate for deletion */ @VisibleForTesting - public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, - Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne, + public DatanodeStorageInfo chooseReplicaToDelete( + Collection<DatanodeStorageInfo> moreThanOne, + Collection<DatanodeStorageInfo> exactlyOne, final List<StorageType> excessTypes) { long oldestHeartbeat = monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier; @@ -911,7 +911,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) { + for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne, + exactlyOne)) { if (!excessTypes.contains(storage.getStorageType())) { continue; } @@ -972,13 +973,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) { final DatanodeStorageInfo cur; - if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, - moreThanOne, excessTypes)) { + if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage, + moreThanOne, exactlyOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal - cur = - chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne, - excessTypes); + cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes); } firstOne = false; if (cur == null) { @@ -997,26 +996,40 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { /** Check if we can use delHint. */ @VisibleForTesting - static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, - DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, + boolean useDelHint(DatanodeStorageInfo delHint, + DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne, + Collection<DatanodeStorageInfo> exactlyOne, List<StorageType> excessTypes) { - if (!isFirst) { - return false; // only consider delHint for the first case - } else if (delHint == null) { + if (delHint == null) { return false; // no delHint } else if (!excessTypes.contains(delHint.getStorageType())) { return false; // delHint storage type is not an excess type } else { // check if removing delHint reduces the number of racks - if (moreThan1Racks.contains(delHint)) { - return true; // delHint and some other nodes are under the same rack - } else if (added != null && !moreThan1Racks.contains(added)) { - return true; // the added node adds a new rack - } - return false; // removing delHint reduces the number of racks; + return notReduceNumOfGroups(moreThanOne, delHint, added); } } + // Check if moving from source to target will reduce the number of + // groups. The groups could be based on racks or upgrade domains. + <T> boolean notReduceNumOfGroups(List<T> moreThanOne, T source, T target) { + if (moreThanOne.contains(source)) { + return true; // source and some other nodes are under the same group. + } else if (target != null && !moreThanOne.contains(target)) { + return true; // the added node adds a new group. + } + return false; // removing delHint reduces the number of groups. + } + + @Override + public boolean isMovable(Collection<DatanodeInfo> locs, + DatanodeInfo source, DatanodeInfo target) { + final Map<String, List<DatanodeInfo>> rackMap = new HashMap<>(); + final List<DatanodeInfo> moreThanOne = new ArrayList<>(); + final List<DatanodeInfo> exactlyOne = new ArrayList<>(); + splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne); + return notReduceNumOfGroups(moreThanOne, source, target); + } /** * Pick up replica node set for deleting replica as over-replicated. * First set contains replica nodes on rack with more than one http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index 187d8d6..7710654 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -39,11 +39,6 @@ import org.apache.hadoop.net.NodeBase; */ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault { - protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap, DatanodeManager datanodeManager) { - initialize(conf, stats, clusterMap, host2datanodeMap); - } - protected BlockPlacementPolicyWithNodeGroup() { } @@ -345,22 +340,21 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau // Split data nodes in the first set into two sets, // moreThanOne contains nodes on nodegroup with more than one replica // exactlyOne contains the remaining nodes - Map<String, List<DatanodeStorageInfo>> nodeGroupMap = - new HashMap<String, List<DatanodeStorageInfo>>(); + Map<String, List<DatanodeStorageInfo>> nodeGroupMap = new HashMap<>(); for(DatanodeStorageInfo storage : first) { final String nodeGroupName = NetworkTopology.getLastHalf( storage.getDatanodeDescriptor().getNetworkLocation()); List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName); if (storageList == null) { - storageList = new ArrayList<DatanodeStorageInfo>(); + storageList = new ArrayList<>(); nodeGroupMap.put(nodeGroupName, storageList); } storageList.add(storage); } - final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); - final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); // split nodes into two sets for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) { if (datanodeList.size() == 1 ) { @@ -374,5 +368,24 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau return moreThanOne.isEmpty()? exactlyOne : moreThanOne; } - + + /** + * Check if there are any replica (other than source) on the same node group + * with target. If true, then target is not a good candidate for placing + * specific replica as we don't want 2 replicas under the same nodegroup. + * + * @return true if there are any replica (other than source) on the same node + * group with target + */ + @Override + public boolean isMovable(Collection<DatanodeInfo> locs, + DatanodeInfo source, DatanodeInfo target) { + for (DatanodeInfo dn : locs) { + if (dn != source && dn != target && + clusterMap.isOnSameNodeGroup(dn, target)) { + return false; + } + } + return true; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java index 3241908..8d6b13c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.net.NetworkTopology; @@ -117,13 +118,13 @@ public class BlockPlacementPolicyWithUpgradeDomain extends return upgradeDomains; } - private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap( - DatanodeStorageInfo[] storageInfos) { - Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>(); - for(DatanodeStorageInfo storage : storageInfos) { + private <T> Map<String, List<T>> getUpgradeDomainMap( + Collection<T> storagesOrDataNodes) { + Map<String, List<T>> upgradeDomainMap = new HashMap<>(); + for(T storage : storagesOrDataNodes) { String upgradeDomain = getUpgradeDomainWithDefaultValue( - storage.getDatanodeDescriptor()); - List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain); + getDatanodeInfo(storage)); + List<T> storages = upgradeDomainMap.get(upgradeDomain); if (storages == null) { storages = new ArrayList<>(); upgradeDomainMap.put(upgradeDomain, storages); @@ -156,6 +157,19 @@ public class BlockPlacementPolicyWithUpgradeDomain extends return getShareUDSet; } + private Collection<DatanodeStorageInfo> combine( + Collection<DatanodeStorageInfo> moreThanOne, + Collection<DatanodeStorageInfo> exactlyOne) { + List<DatanodeStorageInfo> all = new ArrayList<>(); + if (moreThanOne != null) { + all.addAll(moreThanOne); + } + if (exactlyOne != null) { + all.addAll(exactlyOne); + } + return all; + } + /* * The policy to pick the replica set for deleting the over-replicated * replica which meet the rack and upgrade domain requirements. @@ -231,20 +245,11 @@ public class BlockPlacementPolicyWithUpgradeDomain extends protected Collection<DatanodeStorageInfo> pickupReplicaSet( Collection<DatanodeStorageInfo> moreThanOne, Collection<DatanodeStorageInfo> exactlyOne) { - List<DatanodeStorageInfo> all = new ArrayList<>(); - if (moreThanOne != null) { - all.addAll(moreThanOne); - } - if (exactlyOne != null) { - all.addAll(exactlyOne); - } - - Map<String, List<DatanodeStorageInfo>> upgradeDomains = - getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()])); - // shareUDSet includes DatanodeStorageInfo that share same upgrade // domain with another DatanodeStorageInfo. - List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains); + Collection<DatanodeStorageInfo> all = combine(moreThanOne, exactlyOne); + List<DatanodeStorageInfo> shareUDSet = getShareUDSet( + getUpgradeDomainMap(all)); // shareRackAndUDSet contains those DatanodeStorageInfo that // share rack and upgrade domain with another DatanodeStorageInfo. List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>(); @@ -260,4 +265,47 @@ public class BlockPlacementPolicyWithUpgradeDomain extends } return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet; } + + @Override + boolean useDelHint(DatanodeStorageInfo delHint, + DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne, + Collection<DatanodeStorageInfo> exactlyOne, + List<StorageType> excessTypes) { + if (!super.useDelHint(delHint, added, moreThanOne, exactlyOne, + excessTypes)) { + // If BlockPlacementPolicyDefault doesn't allow useDelHint, there is no + // point checking with upgrade domain policy. + return false; + } + return isMovableBasedOnUpgradeDomain(combine(moreThanOne, exactlyOne), + delHint, added); + } + + // Check if moving from source to target will preserve the upgrade domain + // policy. + private <T> boolean isMovableBasedOnUpgradeDomain(Collection<T> all, + T source, T target) { + Map<String, List<T>> udMap = getUpgradeDomainMap(all); + // shareUDSet includes datanodes that share same upgrade + // domain with another datanode. + List<T> shareUDSet = getShareUDSet(udMap); + // check if removing source reduces the number of upgrade domains + if (notReduceNumOfGroups(shareUDSet, source, target)) { + return true; + } else if (udMap.size() > upgradeDomainFactor) { + return true; // existing number of upgrade domain exceeds the limit. + } else { + return false; // removing source reduces the number of UDs. + } + } + + @Override + public boolean isMovable(Collection<DatanodeInfo> locs, + DatanodeInfo source, DatanodeInfo target) { + if (super.isMovable(locs, source, target)) { + return isMovableBasedOnUpgradeDomain(locs, source, target); + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 80c89e9..d6a7b93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1156,7 +1156,7 @@ public class DFSTestUtil { final StorageType type = (types != null && i < types.length) ? types[i] : StorageType.DEFAULT; storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname, - type); + type, null); } return storages; } @@ -1164,16 +1164,19 @@ public class DFSTestUtil { public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip, String rack, String hostname) { return createDatanodeStorageInfo(storageID, ip, rack, hostname, - StorageType.DEFAULT); + StorageType.DEFAULT, null); } public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip, String rack, String hostname, - StorageType type) { + StorageType type, String upgradeDomain) { final DatanodeStorage storage = new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, type); final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor( ip, rack, storage, hostname); + if (upgradeDomain != null) { + dn.setUpgradeDomain(upgradeDomain); + } return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index dd54345..362c34a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -77,7 +78,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; -import org.apache.hadoop.hdfs.server.balancer.BalancerParameters; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; @@ -409,7 +413,102 @@ public class TestBalancer { int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } - + + /** + * Verify balancer won't violate the default block placement policy. + * @throws Exception + */ + @Test(timeout=100000) + public void testRackPolicyAfterBalance() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + long[] capacities = new long[] { CAPACITY, CAPACITY }; + String[] hosts = {"host0", "host1"}; + String[] racks = { RACK0, RACK1 }; + runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks, + null, CAPACITY, "host2", RACK1, null); + } + + /** + * Verify balancer won't violate upgrade domain block placement policy. + * @throws Exception + */ + @Test(timeout=100000) + public void testUpgradeDomainPolicyAfterBalance() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyWithUpgradeDomain.class, + BlockPlacementPolicy.class); + long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY }; + String[] hosts = {"host0", "host1", "host2"}; + String[] racks = { RACK0, RACK1, RACK1 }; + String[] UDs = { "ud0", "ud1", "ud2" }; + runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks, + UDs, CAPACITY, "host3", RACK2, "ud2"); + } + + private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf, + long[] capacities, String[] hosts, String[] racks, String[] UDs, + long newCapacity, String newHost, String newRack, String newUD) + throws Exception { + int numOfDatanodes = capacities.length; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) + .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); + DatanodeManager dm = cluster.getNamesystem().getBlockManager(). + getDatanodeManager(); + if (UDs != null) { + for(int i = 0; i < UDs.length; i++) { + DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId(); + dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]); + } + } + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + + // fill up the cluster to be 80% full + long totalCapacity = sum(capacities); + long totalUsedSpace = totalCapacity * 8 / 10; + + final long fileSize = totalUsedSpace / numOfDatanodes; + DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, + fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false); + + // start up an empty node with the same capacity on the same rack as the + // pinned host. + cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, + new String[] { newHost }, new long[] { newCapacity }); + if (newUD != null) { + DatanodeID newId = cluster.getDataNodes().get( + numOfDatanodes).getDatanodeId(); + dm.getDatanode(newId).setUpgradeDomain(newUD); + } + totalCapacity += newCapacity; + + // run balancer and validate results + waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + + // start rebalancing + Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); + Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); + BlockPlacementPolicy placementPolicy = + cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy(); + List<LocatedBlock> locatedBlocks = client. + getBlockLocations(fileName, 0, fileSize).getLocatedBlocks(); + for (LocatedBlock locatedBlock : locatedBlocks) { + BlockPlacementStatus status = placementPolicy.verifyBlockPlacement( + locatedBlock.getLocations(), numOfDatanodes); + assertTrue(status.isPlacementPolicySatisfied()); + } + } finally { + cluster.shutdown(); + } + } + /** * Wait until balanced: each datanode gives utilization within * BALANCE_ALLOWED_VARIANCE of average http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 16d482e..9b7ba4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -821,14 +821,15 @@ public class TestBlockManager { DatanodeStorageInfo delHint = new DatanodeStorageInfo( DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("id")); List<DatanodeStorageInfo> moreThan1Racks = Arrays.asList(delHint); - List<StorageType> excessTypes = new ArrayList<StorageType>(); - + List<StorageType> excessTypes = new ArrayList<>(); + BlockPlacementPolicyDefault policyDefault = + (BlockPlacementPolicyDefault) bm.getBlockPlacementPolicy(); excessTypes.add(StorageType.DEFAULT); - Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint, - null, moreThan1Racks, excessTypes)); + Assert.assertTrue(policyDefault.useDelHint(delHint, null, moreThan1Racks, + null, excessTypes)); excessTypes.remove(0); excessTypes.add(StorageType.SSD); - Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint, - null, moreThan1Racks, excessTypes)); + Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks, + null, excessTypes)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 37fcf34..c3fe466 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; @@ -971,11 +972,11 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // test returning null excessTypes.add(StorageType.SSD); assertNull(((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete((short) 3, first, second, excessTypes)); + .chooseReplicaToDelete(first, second, excessTypes)); } excessTypes.add(StorageType.DEFAULT); DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete((short) 3, first, second, excessTypes); + .chooseReplicaToDelete(first, second, excessTypes); // Within first set, storages[1] with less free space assertEquals(chosen, storages[1]); @@ -985,25 +986,25 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Within second set, storages[5] with less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - (short)2, first, second, excessTypes); + first, second, excessTypes); assertEquals(chosen, storages[5]); } @Test public void testChooseReplicasToDelete() throws Exception { - Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>(); + Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>(); nonExcess.add(storages[0]); nonExcess.add(storages[1]); nonExcess.add(storages[2]); nonExcess.add(storages[3]); - List<DatanodeStorageInfo> excessReplicas = new ArrayList<>(); + List<DatanodeStorageInfo> excessReplicas; BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite .createDefaultSuite(); BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy(); DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo( "Storage-excess-SSD-ID", "localhost", storages[0].getDatanodeDescriptor().getNetworkLocation(), - "foo.com", StorageType.SSD); + "foo.com", StorageType.SSD, null); updateHeartbeatWithUsage(excessSSD.getDatanodeDescriptor(), 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, @@ -1016,14 +1017,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { DatanodeStorageInfo.toStorageTypes(nonExcess)); excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); - assertTrue(excessReplicas.size() > 0); + assertTrue(excessReplicas.size() == 1); assertTrue(excessReplicas.contains(storages[0])); // Excess type deletion DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo( "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(), - "foo.com", StorageType.ARCHIVE); + "foo.com", StorageType.ARCHIVE, null); nonExcess.add(excessStorage); excessTypes = storagePolicy.chooseExcess((short) 3, DatanodeStorageInfo.toStorageTypes(nonExcess)); @@ -1057,32 +1058,70 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { @Test public void testUseDelHint() throws Exception { - List<StorageType> excessTypes = new ArrayList<StorageType>(); + List<StorageType> excessTypes = new ArrayList<>(); excessTypes.add(StorageType.ARCHIVE); - // only consider delHint for the first case - assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null, - null)); + BlockPlacementPolicyDefault policyDefault = + (BlockPlacementPolicyDefault) replicator; // no delHint - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null, - null)); + assertFalse(policyDefault.useDelHint(null, null, null, null, null)); // delHint storage type is not an excess type - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, - null, excessTypes)); + assertFalse(policyDefault.useDelHint(storages[0], null, null, null, + excessTypes)); // check if removing delHint reduces the number of racks - List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); - chosenNodes.add(storages[0]); - chosenNodes.add(storages[2]); + List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); + moreThanOne.add(storages[0]); + moreThanOne.add(storages[1]); + List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); + exactlyOne.add(storages[3]); + exactlyOne.add(storages[5]); + excessTypes.add(StorageType.DEFAULT); - assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, - chosenNodes, excessTypes)); + assertTrue(policyDefault.useDelHint(storages[0], null, moreThanOne, + exactlyOne, excessTypes)); // the added node adds a new rack - assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3], - storages[5], chosenNodes, excessTypes)); + assertTrue(policyDefault.useDelHint(storages[3], storages[5], moreThanOne, + exactlyOne, excessTypes)); // removing delHint reduces the number of racks; - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], - storages[0], chosenNodes, excessTypes)); - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null, - chosenNodes, excessTypes)); + assertFalse(policyDefault.useDelHint(storages[3], storages[0], moreThanOne, + exactlyOne, excessTypes)); + assertFalse(policyDefault.useDelHint(storages[3], null, moreThanOne, + exactlyOne, excessTypes)); + } + + @Test + public void testIsMovable() throws Exception { + List<DatanodeInfo> candidates = new ArrayList<>(); + + // after the move, the number of racks remains 2. + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[3]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3])); + + // after the move, the number of racks remains 3. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[4]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[1])); + + // after the move, the number of racks changes from 2 to 3. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[4]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4])); + + // the move would have reduced the number of racks from 3 to 2. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[3]); + candidates.add(dataNodes[4]); + assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[3])); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 0ff7770..367faea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -544,7 +544,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes List<StorageType> excessTypes = new ArrayList<>(); excessTypes.add(StorageType.DEFAULT); DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete((short) 3, first, second, excessTypes); + .chooseReplicaToDelete(first, second, excessTypes); // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, // dataNodes[0] and dataNodes[1] are in the same nodegroup, // but dataNodes[1] is chosen as less free space @@ -557,7 +557,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes // as less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - (short) 2, first, second, excessTypes); + first, second, excessTypes); assertEquals(chosen, storages[2]); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); @@ -566,7 +566,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes // Within second set, dataNodes[5] with less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - (short) 1, first, second, excessTypes); + first, second, excessTypes); assertEquals(chosen, storages[5]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index b5caebf..608817f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -190,41 +193,6 @@ public class TestReplicationPolicyWithUpgradeDomain } /** - * Verify the correct replica is chosen to satisfy both rack and upgrade - * domain policy. - * @throws Exception - */ - @Test - public void testChooseReplicaToDelete() throws Exception { - BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy = - (BlockPlacementPolicyWithUpgradeDomain)replicator; - List<DatanodeStorageInfo> first = new ArrayList<>(); - List<DatanodeStorageInfo> second = new ArrayList<>(); - List<StorageType> excessTypes = new ArrayList<>(); - excessTypes.add(StorageType.DEFAULT); - first.add(storages[0]); - first.add(storages[1]); - second.add(storages[4]); - second.add(storages[8]); - DatanodeStorageInfo chosenStorage = - upgradeDomainPolicy.chooseReplicaToDelete( - (short)3, first, second, excessTypes); - assertEquals(chosenStorage, storages[1]); - first.clear(); - second.clear(); - - excessTypes.add(StorageType.DEFAULT); - first.add(storages[0]); - first.add(storages[1]); - first.add(storages[4]); - first.add(storages[5]); - chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete( - (short)3, first, second, excessTypes); - assertTrue(chosenStorage.equals(storages[1]) || - chosenStorage.equals(storages[4])); - } - - /** * Test the scenario where not enough replicas can't satisfy the policy. * @throws Exception */ @@ -248,7 +216,7 @@ public class TestReplicationPolicyWithUpgradeDomain } /** - * Test the scenario where not enough replicas can't satisfy the policy. + * Test block placement verification. * @throws Exception */ @Test @@ -341,6 +309,137 @@ public class TestReplicationPolicyWithUpgradeDomain assertFalse(status.isPlacementPolicySatisfied()); } + /** + * Verify the correct replica is chosen to satisfy both rack and upgrade + * domain policy. + * @throws Exception + */ + @Test + public void testChooseReplicasToDelete() throws Exception { + Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[2]); + nonExcess.add(storages[3]); + List<DatanodeStorageInfo> excessReplicas; + BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite + .createDefaultSuite(); + BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy(); + + // delete hint accepted. + DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor(); + List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[0])); + + // delete hint rejected because deleting storages[1] would have + // cause only two upgrade domains left. + delHintNode = storages[1].getDatanodeDescriptor(); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[0])); + + // no delete hint, case 1 + nonExcess.clear(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[4]); + nonExcess.add(storages[8]); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[8].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[1])); + + // no delete hint, case 2 + nonExcess.clear(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[4]); + nonExcess.add(storages[5]); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[8].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[1]) || + excessReplicas.contains(storages[4])); + + // No delete hint, different excess type deletion + nonExcess.clear(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[2]); + nonExcess.add(storages[3]); + DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo( + "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(), + "foo.com", StorageType.ARCHIVE, delHintNode.getUpgradeDomain()); + nonExcess.add(excessStorage); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.size() == 2); + assertTrue(excessReplicas.contains(storages[0])); + assertTrue(excessReplicas.contains(excessStorage)); + } + + @Test + public void testIsMovable() throws Exception { + List<DatanodeInfo> candidates = new ArrayList<>(); + + // after the move, the number of racks changes from 1 to 2. + // and number of upgrade domains remains 3. + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[3]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3])); + + // the move would have changed the number of racks from 1 to 2. + // and the number of UDs from 3 to 2. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[4]); + assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[4])); + + // after the move, the number of racks remains 2. + // the number of UDs remains 3. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[4]); + candidates.add(dataNodes[5]); + candidates.add(dataNodes[6]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[6])); + + // after the move, the number of racks remains 2. + // the number of UDs remains 2. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[3]); + candidates.add(dataNodes[4]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4])); + + // the move would have changed the number of racks from 2 to 3. + // and the number of UDs from 2 to 1. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[3]); + candidates.add(dataNodes[4]); + candidates.add(dataNodes[6]); + assertFalse(replicator.isMovable(candidates, dataNodes[4], dataNodes[6])); + } + private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) { HashSet<String> upgradeDomains = new HashSet<>(); for (DatanodeStorageInfo node : nodes) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 143665a..9df4399 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -629,11 +629,13 @@ public class TestDNFencing { } @Override - public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, - Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second, + public DatanodeStorageInfo chooseReplicaToDelete( + Collection<DatanodeStorageInfo> moreThanOne, + Collection<DatanodeStorageInfo> exactlyOne, List<StorageType> excessTypes) { - - Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second; + + Collection<DatanodeStorageInfo> chooseFrom = !moreThanOne.isEmpty() ? + moreThanOne : exactlyOne; List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom); return l.get(ThreadLocalRandom.current().nextInt(l.size()));