Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 457fe0870 -> fbba87017


HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ec414600
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ec414600
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ec414600

Branch: refs/heads/HDFS-8707
Commit: ec414600ede8e305c584818565b50e055ea5d2b5
Parents: 88beb46
Author: Lei Xu <l...@apache.org>
Authored: Tue Nov 3 14:17:11 2015 -0800
Committer: Lei Xu <l...@apache.org>
Committed: Wed Nov 4 10:22:17 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  65 ++-----
 .../blockmanagement/BlockPlacementPolicy.java   |  53 ++++--
 .../BlockPlacementPolicyDefault.java            |  57 ++++---
 .../BlockPlacementPolicyWithNodeGroup.java      |  35 ++--
 .../BlockPlacementPolicyWithUpgradeDomain.java  |  84 +++++++--
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   9 +-
 .../hdfs/server/balancer/TestBalancer.java      | 103 ++++++++++-
 .../blockmanagement/TestBlockManager.java       |  13 +-
 .../blockmanagement/TestReplicationPolicy.java  |  93 +++++++---
 .../TestReplicationPolicyWithNodeGroup.java     |   6 +-
 .../TestReplicationPolicyWithUpgradeDomain.java | 171 +++++++++++++++----
 .../hdfs/server/namenode/ha/TestDNFencing.java  |  10 +-
 13 files changed, 503 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index f2d8296..fd560d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1618,6 +1618,8 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-9339. Extend full test of KMS ACLs. (Daniel Templeton via zhz)
 
+    HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via 
lei)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 5b3eb36..9f9cdc0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -65,6 +65,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -124,6 +125,7 @@ public class Dispatcher {
   private final int ioFileBufferSize;
 
   private final boolean connectToDnViaHostname;
+  private BlockPlacementPolicies placementPolicies;
 
   static class Allocator {
     private final int max;
@@ -949,6 +951,7 @@ public class Dispatcher {
     this.connectToDnViaHostname = conf.getBoolean(
         HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
         HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null);
   }
 
   public DistributedFileSystem getDistributedFileSystem() {
@@ -1166,66 +1169,24 @@ public class Dispatcher {
       }
     }
 
-    if (cluster.isNodeGroupAware()
-        && isOnSameNodeGroupWithReplicas(source, target, block)) {
-      return false;
-    }
-    if (reduceNumOfRacks(source, target, block)) {
+    if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) {
       return false;
     }
     return true;
   }
 
-  /**
-   * Determine whether moving the given block replica from source to target
-   * would reduce the number of racks of the block replicas.
-   */
-  private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target,
-      DBlock block) {
-    final DatanodeInfo sourceDn = source.getDatanodeInfo();
-    if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
-      // source and target are on the same rack
-      return false;
-    }
-    boolean notOnSameRack = true;
+  // Check if the move will violate the block placement policy.
+  private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source,
+     StorageGroup target, DBlock block) {
+    List<DatanodeInfo> datanodeInfos = new ArrayList<>();
     synchronized (block) {
-      for (StorageGroup loc : block.getLocations()) {
-        if (cluster.isOnSameRack(loc.getDatanodeInfo(), 
target.getDatanodeInfo())) {
-          notOnSameRack = false;
-          break;
-        }
-      }
-    }
-    if (notOnSameRack) {
-      // target is not on the same rack as any replica
-      return false;
-    }
-    for (StorageGroup g : block.getLocations()) {
-      if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) {
-        // source is on the same rack of another replica
-        return false;
+      for (StorageGroup loc : block.locations) {
+        datanodeInfos.add(loc.getDatanodeInfo());
       }
+      datanodeInfos.add(target.getDatanodeInfo());
     }
-    return true;
-  }
-
-  /**
-   * Check if there are any replica (other than source) on the same node group
-   * with target. If true, then target is not a good candidate for placing
-   * specific replica as we don't want 2 replicas under the same nodegroup.
-   *
-   * @return true if there are any replica (other than source) on the same node
-   *         group with target
-   */
-  private boolean isOnSameNodeGroupWithReplicas(StorageGroup source,
-      StorageGroup target, DBlock block) {
-    final DatanodeInfo targetDn = target.getDatanodeInfo();
-    for (StorageGroup g : block.getLocations()) {
-      if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), 
targetDn)) {
-        return true;
-      }
-    }
-    return false;
+    return placementPolicies.getPolicy(false).isMovable(
+        datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo());
   }
 
   /** Reset all fields in order to prepare for the next iteration */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
index 526a5d7..8478387 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
@@ -141,6 +142,17 @@ public abstract class BlockPlacementPolicy {
                                      Host2NodesMap host2datanodeMap);
 
   /**
+   * Check if the move is allowed. Used by balancer and other tools.
+   * @
+   *
+   * @param candidates all replicas including source and target
+   * @param source source replica of the move
+   * @param target target replica of the move
+   */
+  abstract public boolean isMovable(Collection<DatanodeInfo> candidates,
+      DatanodeInfo source, DatanodeInfo target);
+
+  /**
    * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
    *
    * @param rackMap a map from rack to replica
@@ -172,6 +184,20 @@ public abstract class BlockPlacementPolicy {
     }
   }
 
+  protected <T> DatanodeInfo getDatanodeInfo(T datanode) {
+    Preconditions.checkArgument(
+        datanode instanceof DatanodeInfo ||
+        datanode instanceof DatanodeStorageInfo,
+        "class " + datanode.getClass().getName() + " not allowed");
+    if (datanode instanceof DatanodeInfo) {
+      return ((DatanodeInfo)datanode);
+    } else if (datanode instanceof DatanodeStorageInfo) {
+      return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
+    } else {
+      return null;
+    }
+  }
+
   /**
    * Get rack string from a data node
    * @return rack of data node
@@ -179,33 +205,33 @@ public abstract class BlockPlacementPolicy {
   protected String getRack(final DatanodeInfo datanode) {
     return datanode.getNetworkLocation();
   }
-  
+
   /**
    * Split data nodes into two sets, one set includes nodes on rack with
    * more than one  replica, the other set contains the remaining nodes.
    * 
-   * @param dataNodes datanodes to be split into two sets
+   * @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split
+   *        into two sets
    * @param rackMap a map from rack to datanodes
    * @param moreThanOne contains nodes on rack with more than one replica
    * @param exactlyOne remains contains the remaining nodes
    */
-  public void splitNodesWithRack(
-      final Iterable<DatanodeStorageInfo> storages,
-      final Map<String, List<DatanodeStorageInfo>> rackMap,
-      final List<DatanodeStorageInfo> moreThanOne,
-      final List<DatanodeStorageInfo> exactlyOne) {
-    for(DatanodeStorageInfo s: storages) {
-      final String rackName = getRack(s.getDatanodeDescriptor());
-      List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
+  public <T> void splitNodesWithRack(
+      final Iterable<T> storagesOrDataNodes,
+      final Map<String, List<T>> rackMap,
+      final List<T> moreThanOne,
+      final List<T> exactlyOne) {
+    for(T s: storagesOrDataNodes) {
+      final String rackName = getRack(getDatanodeInfo(s));
+      List<T> storageList = rackMap.get(rackName);
       if (storageList == null) {
-        storageList = new ArrayList<DatanodeStorageInfo>();
+        storageList = new ArrayList<T>();
         rackMap.put(rackName, storageList);
       }
       storageList.add(s);
     }
-    
     // split nodes into two sets
-    for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
+    for(List<T> storageList : rackMap.values()) {
       if (storageList.size() == 1) {
         // exactlyOne contains nodes on rack with only one replica
         exactlyOne.add(storageList.get(0));
@@ -215,5 +241,4 @@ public abstract class BlockPlacementPolicy {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 2723ed9..56ebc35 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -881,7 +881,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     minRacks = Math.min(minRacks, numberOfReplicas);
     // 1. Check that all locations are different.
     // 2. Count locations on different racks.
-    Set<String> racks = new TreeSet<String>();
+    Set<String> racks = new TreeSet<>();
     for (DatanodeInfo dn : locs)
       racks.add(dn.getNetworkLocation());
     return new BlockPlacementStatusDefault(racks.size(), minRacks);
@@ -889,8 +889,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
   /**
    * Decide whether deleting the specified replica of the block still makes
    * the block conform to the configured block placement policy.
-   * @param replicationFactor The required number of replicas for this block
-   * @param moreThanone The replica locations of this block that are present
+   * @param moreThanOne The replica locations of this block that are present
    *                    on more than one unique racks.
    * @param exactlyOne Replica locations of this block that  are present
    *                    on exactly one unique racks.
@@ -900,8 +899,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    * @return the replica that is the best candidate for deletion
    */
   @VisibleForTesting
-  public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
-      Collection<DatanodeStorageInfo> moreThanone, 
Collection<DatanodeStorageInfo> exactlyOne,
+  public DatanodeStorageInfo chooseReplicaToDelete(
+      Collection<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne,
       final List<StorageType> excessTypes) {
     long oldestHeartbeat =
       monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
@@ -911,7 +911,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
 
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
-    for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, 
exactlyOne)) {
+    for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne,
+        exactlyOne)) {
       if (!excessTypes.contains(storage.getStorageType())) {
         continue;
       }
@@ -972,13 +973,11 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
 
     while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
       final DatanodeStorageInfo cur;
-      if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
-          moreThanOne, excessTypes)) {
+      if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage,
+          moreThanOne, exactlyOne, excessTypes)) {
         cur = delNodeHintStorage;
       } else { // regular excessive replica removal
-        cur =
-            chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, 
exactlyOne,
-                excessTypes);
+        cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes);
       }
       firstOne = false;
       if (cur == null) {
@@ -997,26 +996,40 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
 
   /** Check if we can use delHint. */
   @VisibleForTesting
-  static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
-      DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
+  boolean useDelHint(DatanodeStorageInfo delHint,
+      DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne,
       List<StorageType> excessTypes) {
-    if (!isFirst) {
-      return false; // only consider delHint for the first case
-    } else if (delHint == null) {
+    if (delHint == null) {
       return false; // no delHint
     } else if (!excessTypes.contains(delHint.getStorageType())) {
       return false; // delHint storage type is not an excess type
     } else {
       // check if removing delHint reduces the number of racks
-      if (moreThan1Racks.contains(delHint)) {
-        return true; // delHint and some other nodes are under the same rack
-      } else if (added != null && !moreThan1Racks.contains(added)) {
-        return true; // the added node adds a new rack
-      }
-      return false; // removing delHint reduces the number of racks;
+      return notReduceNumOfGroups(moreThanOne, delHint, added);
     }
   }
 
+  // Check if moving from source to target will reduce the number of
+  // groups. The groups could be based on racks or upgrade domains.
+  <T> boolean notReduceNumOfGroups(List<T> moreThanOne, T source, T target) {
+    if (moreThanOne.contains(source)) {
+      return true; // source and some other nodes are under the same group.
+    } else if (target != null && !moreThanOne.contains(target)) {
+      return true; // the added node adds a new group.
+    }
+    return false; // removing delHint reduces the number of groups.
+  }
+
+  @Override
+  public boolean isMovable(Collection<DatanodeInfo> locs,
+      DatanodeInfo source, DatanodeInfo target) {
+    final Map<String, List<DatanodeInfo>> rackMap = new HashMap<>();
+    final List<DatanodeInfo> moreThanOne = new ArrayList<>();
+    final List<DatanodeInfo> exactlyOne = new ArrayList<>();
+    splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne);
+    return notReduceNumOfGroups(moreThanOne, source, target);
+  }
   /**
    * Pick up replica node set for deleting replica as over-replicated. 
    * First set contains replica nodes on rack with more than one

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
index 187d8d6..7710654 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -39,11 +39,6 @@ import org.apache.hadoop.net.NodeBase;
  */
 public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefault {
 
-  protected BlockPlacementPolicyWithNodeGroup(Configuration conf,  
FSClusterStats stats,
-      NetworkTopology clusterMap, DatanodeManager datanodeManager) {
-    initialize(conf, stats, clusterMap, host2datanodeMap);
-  }
-
   protected BlockPlacementPolicyWithNodeGroup() {
   }
 
@@ -345,22 +340,21 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     // Split data nodes in the first set into two sets, 
     // moreThanOne contains nodes on nodegroup with more than one replica
     // exactlyOne contains the remaining nodes
-    Map<String, List<DatanodeStorageInfo>> nodeGroupMap = 
-        new HashMap<String, List<DatanodeStorageInfo>>();
+    Map<String, List<DatanodeStorageInfo>> nodeGroupMap = new HashMap<>();
     
     for(DatanodeStorageInfo storage : first) {
       final String nodeGroupName = NetworkTopology.getLastHalf(
           storage.getDatanodeDescriptor().getNetworkLocation());
       List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
       if (storageList == null) {
-        storageList = new ArrayList<DatanodeStorageInfo>();
+        storageList = new ArrayList<>();
         nodeGroupMap.put(nodeGroupName, storageList);
       }
       storageList.add(storage);
     }
     
-    final List<DatanodeStorageInfo> moreThanOne = new 
ArrayList<DatanodeStorageInfo>();
-    final List<DatanodeStorageInfo> exactlyOne = new 
ArrayList<DatanodeStorageInfo>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
     // split nodes into two sets
     for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
       if (datanodeList.size() == 1 ) {
@@ -374,5 +368,24 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     
     return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
   }
-  
+
+  /**
+   * Check if there are any replica (other than source) on the same node group
+   * with target. If true, then target is not a good candidate for placing
+   * specific replica as we don't want 2 replicas under the same nodegroup.
+   *
+   * @return true if there are any replica (other than source) on the same node
+   *         group with target
+   */
+  @Override
+  public boolean isMovable(Collection<DatanodeInfo> locs,
+      DatanodeInfo source, DatanodeInfo target) {
+    for (DatanodeInfo dn : locs) {
+      if (dn != source && dn != target &&
+          clusterMap.isOnSameNodeGroup(dn, target)) {
+        return false;
+      }
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
index 3241908..8d6b13c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
@@ -30,6 +30,7 @@ import java.util.Set;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.net.NetworkTopology;
@@ -117,13 +118,13 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
     return upgradeDomains;
   }
 
-  private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap(
-      DatanodeStorageInfo[] storageInfos) {
-    Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>();
-    for(DatanodeStorageInfo storage : storageInfos) {
+  private <T> Map<String, List<T>> getUpgradeDomainMap(
+      Collection<T> storagesOrDataNodes) {
+    Map<String, List<T>> upgradeDomainMap = new HashMap<>();
+    for(T storage : storagesOrDataNodes) {
       String upgradeDomain = getUpgradeDomainWithDefaultValue(
-          storage.getDatanodeDescriptor());
-      List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain);
+          getDatanodeInfo(storage));
+      List<T> storages = upgradeDomainMap.get(upgradeDomain);
       if (storages == null) {
         storages = new ArrayList<>();
         upgradeDomainMap.put(upgradeDomain, storages);
@@ -156,6 +157,19 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
     return getShareUDSet;
   }
 
+  private Collection<DatanodeStorageInfo> combine(
+      Collection<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne) {
+    List<DatanodeStorageInfo> all = new ArrayList<>();
+    if (moreThanOne != null) {
+      all.addAll(moreThanOne);
+    }
+    if (exactlyOne != null) {
+      all.addAll(exactlyOne);
+    }
+    return all;
+  }
+
   /*
    * The policy to pick the replica set for deleting the over-replicated
    * replica which meet the rack and upgrade domain requirements.
@@ -231,20 +245,11 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
   protected Collection<DatanodeStorageInfo> pickupReplicaSet(
       Collection<DatanodeStorageInfo> moreThanOne,
       Collection<DatanodeStorageInfo> exactlyOne) {
-    List<DatanodeStorageInfo> all = new ArrayList<>();
-    if (moreThanOne != null) {
-      all.addAll(moreThanOne);
-    }
-    if (exactlyOne != null) {
-      all.addAll(exactlyOne);
-    }
-
-    Map<String, List<DatanodeStorageInfo>> upgradeDomains =
-        getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()]));
-
     // shareUDSet includes DatanodeStorageInfo that share same upgrade
     // domain with another DatanodeStorageInfo.
-    List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains);
+    Collection<DatanodeStorageInfo> all = combine(moreThanOne, exactlyOne);
+    List<DatanodeStorageInfo> shareUDSet = getShareUDSet(
+        getUpgradeDomainMap(all));
     // shareRackAndUDSet contains those DatanodeStorageInfo that
     // share rack and upgrade domain with another DatanodeStorageInfo.
     List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
@@ -260,4 +265,47 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
     }
     return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet;
   }
+
+  @Override
+  boolean useDelHint(DatanodeStorageInfo delHint,
+      DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne,
+      List<StorageType> excessTypes) {
+    if (!super.useDelHint(delHint, added, moreThanOne, exactlyOne,
+        excessTypes)) {
+      // If BlockPlacementPolicyDefault doesn't allow useDelHint, there is no
+      // point checking with upgrade domain policy.
+      return false;
+    }
+    return isMovableBasedOnUpgradeDomain(combine(moreThanOne, exactlyOne),
+        delHint, added);
+  }
+
+  // Check if moving from source to target will preserve the upgrade domain
+  // policy.
+  private <T> boolean isMovableBasedOnUpgradeDomain(Collection<T> all,
+      T source, T target) {
+    Map<String, List<T>> udMap = getUpgradeDomainMap(all);
+    // shareUDSet includes datanodes that share same upgrade
+    // domain with another datanode.
+    List<T> shareUDSet = getShareUDSet(udMap);
+    // check if removing source reduces the number of upgrade domains
+    if (notReduceNumOfGroups(shareUDSet, source, target)) {
+      return true;
+    } else if (udMap.size() > upgradeDomainFactor) {
+      return true; // existing number of upgrade domain exceeds the limit.
+    } else {
+      return false; // removing source reduces the number of UDs.
+    }
+  }
+
+  @Override
+  public boolean isMovable(Collection<DatanodeInfo> locs,
+      DatanodeInfo source, DatanodeInfo target) {
+    if (super.isMovable(locs, source, target)) {
+      return isMovableBasedOnUpgradeDomain(locs, source, target);
+    } else {
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 80c89e9..d6a7b93 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1156,7 +1156,7 @@ public class DFSTestUtil {
       final StorageType type = (types != null && i < types.length) ? types[i]
           : StorageType.DEFAULT;
       storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname,
-          type);
+          type, null);
     }
     return storages;
   }
@@ -1164,16 +1164,19 @@ public class DFSTestUtil {
   public static DatanodeStorageInfo createDatanodeStorageInfo(
       String storageID, String ip, String rack, String hostname) {
     return createDatanodeStorageInfo(storageID, ip, rack, hostname,
-        StorageType.DEFAULT);
+        StorageType.DEFAULT, null);
   }
 
   public static DatanodeStorageInfo createDatanodeStorageInfo(
       String storageID, String ip, String rack, String hostname,
-      StorageType type) {
+      StorageType type, String upgradeDomain) {
     final DatanodeStorage storage = new DatanodeStorage(storageID,
         DatanodeStorage.State.NORMAL, type);
     final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(
         ip, rack, storage, hostname);
+    if (upgradeDomain != null) {
+      dn.setUpgradeDomain(upgradeDomain);
+    }
     return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index dd54345..362c34a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -77,7 +78,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
-import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
@@ -409,7 +413,102 @@ public class TestBalancer {
     int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
     assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
   }
-  
+
+  /**
+   * Verify balancer won't violate the default block placement policy.
+   * @throws Exception
+   */
+  @Test(timeout=100000)
+  public void testRackPolicyAfterBalance() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    long[] capacities =  new long[] { CAPACITY, CAPACITY };
+    String[] hosts = {"host0", "host1"};
+    String[] racks = { RACK0, RACK1 };
+    runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
+        null, CAPACITY, "host2", RACK1, null);
+  }
+
+  /**
+   * Verify balancer won't violate upgrade domain block placement policy.
+   * @throws Exception
+   */
+  @Test(timeout=100000)
+  public void testUpgradeDomainPolicyAfterBalance() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyWithUpgradeDomain.class,
+        BlockPlacementPolicy.class);
+    long[] capacities =  new long[] { CAPACITY, CAPACITY, CAPACITY };
+    String[] hosts = {"host0", "host1", "host2"};
+    String[] racks = { RACK0, RACK1, RACK1 };
+    String[] UDs = { "ud0", "ud1", "ud2" };
+    runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
+        UDs, CAPACITY, "host3", RACK2, "ud2");
+  }
+
+  private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
+      long[] capacities, String[] hosts, String[] racks, String[] UDs,
+      long newCapacity, String newHost, String newRack, String newUD)
+          throws Exception {
+    int numOfDatanodes = capacities.length;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+        .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
+    DatanodeManager dm = cluster.getNamesystem().getBlockManager().
+        getDatanodeManager();
+    if (UDs != null) {
+      for(int i = 0; i < UDs.length; i++) {
+        DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
+        dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
+      }
+    }
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+      // fill up the cluster to be 80% full
+      long totalCapacity = sum(capacities);
+      long totalUsedSpace = totalCapacity * 8 / 10;
+
+      final long fileSize = totalUsedSpace / numOfDatanodes;
+      DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
+          fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
+
+      // start up an empty node with the same capacity on the same rack as the
+      // pinned host.
+      cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
+          new String[] { newHost }, new long[] { newCapacity });
+      if (newUD != null) {
+        DatanodeID newId = cluster.getDataNodes().get(
+            numOfDatanodes).getDatanodeId();
+        dm.getDatanode(newId).setUpgradeDomain(newUD);
+      }
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+
+      // start rebalancing
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+      BlockPlacementPolicy placementPolicy =
+          cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
+      List<LocatedBlock> locatedBlocks = client.
+          getBlockLocations(fileName, 0, fileSize).getLocatedBlocks();
+      for (LocatedBlock locatedBlock : locatedBlocks) {
+        BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
+            locatedBlock.getLocations(), numOfDatanodes);
+        assertTrue(status.isPlacementPolicySatisfied());
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Wait until balanced: each datanode gives utilization within 
    * BALANCE_ALLOWED_VARIANCE of average

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 16d482e..9b7ba4a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -821,14 +821,15 @@ public class TestBlockManager {
     DatanodeStorageInfo delHint = new DatanodeStorageInfo(
         DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("id"));
     List<DatanodeStorageInfo> moreThan1Racks = Arrays.asList(delHint);
-    List<StorageType> excessTypes = new ArrayList<StorageType>();
-
+    List<StorageType> excessTypes = new ArrayList<>();
+    BlockPlacementPolicyDefault policyDefault =
+        (BlockPlacementPolicyDefault) bm.getBlockPlacementPolicy();
     excessTypes.add(StorageType.DEFAULT);
-    Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint,
-        null, moreThan1Racks, excessTypes));
+    Assert.assertTrue(policyDefault.useDelHint(delHint, null, moreThan1Racks,
+        null, excessTypes));
     excessTypes.remove(0);
     excessTypes.add(StorageType.SSD);
-    Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint,
-        null, moreThan1Racks, excessTypes));
+    Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks,
+        null, excessTypes));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 37fcf34..c3fe466 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -971,11 +972,11 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
       // test returning null
       excessTypes.add(StorageType.SSD);
       assertNull(((BlockPlacementPolicyDefault) replicator)
-          .chooseReplicaToDelete((short) 3, first, second, excessTypes));
+          .chooseReplicaToDelete(first, second, excessTypes));
     }
     excessTypes.add(StorageType.DEFAULT);
     DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
-        .chooseReplicaToDelete((short) 3, first, second, excessTypes);
+        .chooseReplicaToDelete(first, second, excessTypes);
     // Within first set, storages[1] with less free space
     assertEquals(chosen, storages[1]);
 
@@ -985,25 +986,25 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
     // Within second set, storages[5] with less free space
     excessTypes.add(StorageType.DEFAULT);
     chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
-        (short)2, first, second, excessTypes);
+        first, second, excessTypes);
     assertEquals(chosen, storages[5]);
   }
 
   @Test
   public void testChooseReplicasToDelete() throws Exception {
-    Collection<DatanodeStorageInfo> nonExcess = new 
ArrayList<DatanodeStorageInfo>();
+    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
     nonExcess.add(storages[0]);
     nonExcess.add(storages[1]);
     nonExcess.add(storages[2]);
     nonExcess.add(storages[3]);
-    List<DatanodeStorageInfo> excessReplicas = new ArrayList<>();
+    List<DatanodeStorageInfo> excessReplicas;
     BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
         .createDefaultSuite();
     BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
     DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo(
         "Storage-excess-SSD-ID", "localhost",
         storages[0].getDatanodeDescriptor().getNetworkLocation(),
-        "foo.com", StorageType.SSD);
+        "foo.com", StorageType.SSD, null);
     updateHeartbeatWithUsage(excessSSD.getDatanodeDescriptor(),
         2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0,
@@ -1016,14 +1017,14 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
         DatanodeStorageInfo.toStorageTypes(nonExcess));
     excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
         excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
-    assertTrue(excessReplicas.size() > 0);
+    assertTrue(excessReplicas.size() == 1);
     assertTrue(excessReplicas.contains(storages[0]));
 
     // Excess type deletion
 
     DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
         "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
-        "foo.com", StorageType.ARCHIVE);
+        "foo.com", StorageType.ARCHIVE, null);
     nonExcess.add(excessStorage);
     excessTypes = storagePolicy.chooseExcess((short) 3,
         DatanodeStorageInfo.toStorageTypes(nonExcess));
@@ -1057,32 +1058,70 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
 
  @Test
   public void testUseDelHint() throws Exception {
-    List<StorageType> excessTypes = new ArrayList<StorageType>();
+    List<StorageType> excessTypes = new ArrayList<>();
     excessTypes.add(StorageType.ARCHIVE);
-    // only consider delHint for the first case
-    assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null,
-        null));
+   BlockPlacementPolicyDefault policyDefault =
+       (BlockPlacementPolicyDefault) replicator;
     // no delHint
-    assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null,
-        null));
+    assertFalse(policyDefault.useDelHint(null, null, null, null, null));
     // delHint storage type is not an excess type
-    assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
-        null, excessTypes));
+    assertFalse(policyDefault.useDelHint(storages[0], null, null, null,
+        excessTypes));
     // check if removing delHint reduces the number of racks
-    List<DatanodeStorageInfo> chosenNodes = new 
ArrayList<DatanodeStorageInfo>();
-    chosenNodes.add(storages[0]);
-    chosenNodes.add(storages[2]);
+    List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
+    moreThanOne.add(storages[0]);
+    moreThanOne.add(storages[1]);
+    List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
+    exactlyOne.add(storages[3]);
+    exactlyOne.add(storages[5]);
+
     excessTypes.add(StorageType.DEFAULT);
-    assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
-        chosenNodes, excessTypes));
+    assertTrue(policyDefault.useDelHint(storages[0], null, moreThanOne,
+            exactlyOne, excessTypes));
     // the added node adds a new rack
-    assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
-        storages[5], chosenNodes, excessTypes));
+    assertTrue(policyDefault.useDelHint(storages[3], storages[5], moreThanOne,
+        exactlyOne, excessTypes));
     // removing delHint reduces the number of racks;
-    assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
-        storages[0], chosenNodes, excessTypes));
-    assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null,
-        chosenNodes, excessTypes));
+    assertFalse(policyDefault.useDelHint(storages[3], storages[0], moreThanOne,
+        exactlyOne, excessTypes));
+    assertFalse(policyDefault.useDelHint(storages[3], null, moreThanOne,
+        exactlyOne, excessTypes));
+  }
+
+  @Test
+  public void testIsMovable() throws Exception {
+    List<DatanodeInfo> candidates = new ArrayList<>();
+
+    // after the move, the number of racks remains 2.
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[1]);
+    candidates.add(dataNodes[2]);
+    candidates.add(dataNodes[3]);
+    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
+
+    // after the move, the number of racks remains 3.
+    candidates.clear();
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[1]);
+    candidates.add(dataNodes[2]);
+    candidates.add(dataNodes[4]);
+    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[1]));
+
+    // after the move, the number of racks changes from 2 to 3.
+    candidates.clear();
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[1]);
+    candidates.add(dataNodes[2]);
+    candidates.add(dataNodes[4]);
+    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
+
+    // the move would have reduced the number of racks from 3 to 2.
+    candidates.clear();
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[2]);
+    candidates.add(dataNodes[3]);
+    candidates.add(dataNodes[4]);
+    assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
index 0ff7770..367faea 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
@@ -544,7 +544,7 @@ public class TestReplicationPolicyWithNodeGroup extends 
BaseReplicationPolicyTes
     List<StorageType> excessTypes = new ArrayList<>();
     excessTypes.add(StorageType.DEFAULT);
     DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
-        .chooseReplicaToDelete((short) 3, first, second, excessTypes);
+        .chooseReplicaToDelete(first, second, excessTypes);
     // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, 
     // dataNodes[0] and dataNodes[1] are in the same nodegroup, 
     // but dataNodes[1] is chosen as less free space
@@ -557,7 +557,7 @@ public class TestReplicationPolicyWithNodeGroup extends 
BaseReplicationPolicyTes
     // as less free space
     excessTypes.add(StorageType.DEFAULT);
     chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
-        (short) 2, first, second, excessTypes);
+        first, second, excessTypes);
     assertEquals(chosen, storages[2]);
 
     replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
@@ -566,7 +566,7 @@ public class TestReplicationPolicyWithNodeGroup extends 
BaseReplicationPolicyTes
     // Within second set, dataNodes[5] with less free space
     excessTypes.add(StorageType.DEFAULT);
     chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
-        (short) 1, first, second, excessTypes);
+        first, second, excessTypes);
     assertEquals(chosen, storages[5]);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
index b5caebf..608817f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -190,41 +193,6 @@ public class TestReplicationPolicyWithUpgradeDomain
   }
 
   /**
-   * Verify the correct replica is chosen to satisfy both rack and upgrade
-   * domain policy.
-   * @throws Exception
-   */
-  @Test
-  public void testChooseReplicaToDelete() throws Exception {
-    BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy =
-        (BlockPlacementPolicyWithUpgradeDomain)replicator;
-    List<DatanodeStorageInfo> first = new ArrayList<>();
-    List<DatanodeStorageInfo> second = new ArrayList<>();
-    List<StorageType> excessTypes = new ArrayList<>();
-    excessTypes.add(StorageType.DEFAULT);
-    first.add(storages[0]);
-    first.add(storages[1]);
-    second.add(storages[4]);
-    second.add(storages[8]);
-    DatanodeStorageInfo chosenStorage =
-        upgradeDomainPolicy.chooseReplicaToDelete(
-            (short)3, first, second, excessTypes);
-    assertEquals(chosenStorage, storages[1]);
-    first.clear();
-    second.clear();
-
-    excessTypes.add(StorageType.DEFAULT);
-    first.add(storages[0]);
-    first.add(storages[1]);
-    first.add(storages[4]);
-    first.add(storages[5]);
-    chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
-        (short)3, first, second, excessTypes);
-    assertTrue(chosenStorage.equals(storages[1]) ||
-        chosenStorage.equals(storages[4]));
-  }
-
-  /**
    * Test the scenario where not enough replicas can't satisfy the policy.
    * @throws Exception
    */
@@ -248,7 +216,7 @@ public class TestReplicationPolicyWithUpgradeDomain
   }
 
   /**
-   * Test the scenario where not enough replicas can't satisfy the policy.
+   * Test block placement verification.
    * @throws Exception
    */
   @Test
@@ -341,6 +309,137 @@ public class TestReplicationPolicyWithUpgradeDomain
     assertFalse(status.isPlacementPolicySatisfied());
   }
 
+  /**
+   * Verify the correct replica is chosen to satisfy both rack and upgrade
+   * domain policy.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseReplicasToDelete() throws Exception {
+    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
+    nonExcess.add(storages[0]);
+    nonExcess.add(storages[1]);
+    nonExcess.add(storages[2]);
+    nonExcess.add(storages[3]);
+    List<DatanodeStorageInfo> excessReplicas;
+    BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
+        .createDefaultSuite();
+    BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
+
+    // delete hint accepted.
+    DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
+    List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+        excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
+    assertTrue(excessReplicas.size() == 1);
+    assertTrue(excessReplicas.contains(storages[0]));
+
+    // delete hint rejected because deleting storages[1] would have
+    // cause only two upgrade domains left.
+    delHintNode = storages[1].getDatanodeDescriptor();
+    excessTypes = storagePolicy.chooseExcess((short) 3,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+        excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
+    assertTrue(excessReplicas.size() == 1);
+    assertTrue(excessReplicas.contains(storages[0]));
+
+    // no delete hint, case 1
+    nonExcess.clear();
+    nonExcess.add(storages[0]);
+    nonExcess.add(storages[1]);
+    nonExcess.add(storages[4]);
+    nonExcess.add(storages[8]);
+    excessTypes = storagePolicy.chooseExcess((short) 3,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+        excessTypes, storages[8].getDatanodeDescriptor(), null);
+    assertTrue(excessReplicas.size() == 1);
+    assertTrue(excessReplicas.contains(storages[1]));
+
+    // no delete hint, case 2
+    nonExcess.clear();
+    nonExcess.add(storages[0]);
+    nonExcess.add(storages[1]);
+    nonExcess.add(storages[4]);
+    nonExcess.add(storages[5]);
+    excessTypes = storagePolicy.chooseExcess((short) 3,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+        excessTypes, storages[8].getDatanodeDescriptor(), null);
+    assertTrue(excessReplicas.size() == 1);
+    assertTrue(excessReplicas.contains(storages[1]) ||
+        excessReplicas.contains(storages[4]));
+
+    // No delete hint, different excess type deletion
+    nonExcess.clear();
+    nonExcess.add(storages[0]);
+    nonExcess.add(storages[1]);
+    nonExcess.add(storages[2]);
+    nonExcess.add(storages[3]);
+    DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
+        "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
+        "foo.com", StorageType.ARCHIVE, delHintNode.getUpgradeDomain());
+    nonExcess.add(excessStorage);
+    excessTypes = storagePolicy.chooseExcess((short) 3,
+        DatanodeStorageInfo.toStorageTypes(nonExcess));
+    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
+        excessTypes, storages[3].getDatanodeDescriptor(), null);
+    assertTrue(excessReplicas.size() == 2);
+    assertTrue(excessReplicas.contains(storages[0]));
+    assertTrue(excessReplicas.contains(excessStorage));
+  }
+
+  @Test
+  public void testIsMovable() throws Exception {
+    List<DatanodeInfo> candidates = new ArrayList<>();
+
+    // after the move, the number of racks changes from 1 to 2.
+    // and number of upgrade domains remains 3.
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[1]);
+    candidates.add(dataNodes[2]);
+    candidates.add(dataNodes[3]);
+    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
+
+    // the move would have changed the number of racks from 1 to 2.
+    // and the number of UDs from 3 to 2.
+    candidates.clear();
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[1]);
+    candidates.add(dataNodes[2]);
+    candidates.add(dataNodes[4]);
+    assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
+
+    // after the move, the number of racks remains 2.
+    // the number of UDs remains 3.
+    candidates.clear();
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[4]);
+    candidates.add(dataNodes[5]);
+    candidates.add(dataNodes[6]);
+    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[6]));
+
+    // after the move, the number of racks remains 2.
+    // the number of UDs remains 2.
+    candidates.clear();
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[1]);
+    candidates.add(dataNodes[3]);
+    candidates.add(dataNodes[4]);
+    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
+
+    // the move would have changed the number of racks from 2 to 3.
+    // and the number of UDs from 2 to 1.
+    candidates.clear();
+    candidates.add(dataNodes[0]);
+    candidates.add(dataNodes[3]);
+    candidates.add(dataNodes[4]);
+    candidates.add(dataNodes[6]);
+    assertFalse(replicator.isMovable(candidates, dataNodes[4], dataNodes[6]));
+  }
+
   private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) {
     HashSet<String> upgradeDomains = new HashSet<>();
     for (DatanodeStorageInfo node : nodes) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec414600/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
index 143665a..9df4399 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
@@ -629,11 +629,13 @@ public class TestDNFencing {
     }
 
     @Override
-    public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
-        Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> 
second,
+    public DatanodeStorageInfo chooseReplicaToDelete(
+        Collection<DatanodeStorageInfo> moreThanOne,
+        Collection<DatanodeStorageInfo> exactlyOne,
         List<StorageType> excessTypes) {
-      
-      Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : 
second;
+
+      Collection<DatanodeStorageInfo> chooseFrom = !moreThanOne.isEmpty() ?
+          moreThanOne : exactlyOne;
 
       List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
       return l.get(ThreadLocalRandom.current().nextInt(l.size()));

Reply via email to