HDFS-9390. Block management for maintenance states.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b61fb267
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b61fb267
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b61fb267

Branch: refs/heads/trunk
Commit: b61fb267b92b2736920b4bd0c673d31e7632ebb9
Parents: f5d9235
Author: Ming Ma <min...@apache.org>
Authored: Mon Oct 17 17:45:41 2016 -0700
Committer: Ming Ma <min...@apache.org>
Committed: Mon Oct 17 17:45:41 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  53 +-
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  11 +-
 .../server/blockmanagement/BlockManager.java    | 249 ++++--
 .../BlockPlacementPolicyDefault.java            |   4 +-
 .../CacheReplicationMonitor.java                |   2 +-
 .../blockmanagement/DatanodeDescriptor.java     |  35 +-
 .../server/blockmanagement/DatanodeManager.java |  47 +-
 .../blockmanagement/DecommissionManager.java    | 142 +++-
 .../blockmanagement/ErasureCodingWork.java      |  16 +-
 .../blockmanagement/HeartbeatManager.java       |  23 +-
 .../blockmanagement/LowRedundancyBlocks.java    |  47 +-
 .../server/blockmanagement/NumberReplicas.java  |  30 +-
 .../blockmanagement/StorageTypeStats.java       |   8 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   9 +-
 .../src/main/resources/hdfs-default.xml         |   7 +
 .../apache/hadoop/hdfs/AdminStatesBaseTest.java |  20 +-
 .../apache/hadoop/hdfs/TestDecommission.java    |   2 +-
 .../hadoop/hdfs/TestMaintenanceState.java       | 775 +++++++++++++++++--
 .../blockmanagement/TestBlockManager.java       |   8 +-
 .../namenode/TestDecommissioningStatus.java     |  57 +-
 .../namenode/TestNamenodeCapacityReport.java    |  78 +-
 .../hadoop/hdfs/util/HostsFileWriter.java       |   1 +
 23 files changed, 1240 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 10c0ad6..d54c109 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -220,6 +220,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
       "dfs.namenode.reconstruction.pending.timeout-sec";
   public static final int     
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
 
+  public static final String  DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY =
+      "dfs.namenode.maintenance.replication.min";
+  public static final int     DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT
+      = 1;
+
   public static final String  DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY =
       
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
   public static final int     DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 83870cf..23166e2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -124,48 +124,57 @@ public class DFSUtil {
   }
 
   /**
-   * Compartor for sorting DataNodeInfo[] based on decommissioned states.
-   * Decommissioned nodes are moved to the end of the array on sorting with
-   * this compartor.
+   * Comparator for sorting DataNodeInfo[] based on
+   * decommissioned and entering_maintenance states.
    */
-  public static final Comparator<DatanodeInfo> DECOM_COMPARATOR = 
-    new Comparator<DatanodeInfo>() {
-      @Override
-      public int compare(DatanodeInfo a, DatanodeInfo b) {
-        return a.isDecommissioned() == b.isDecommissioned() ? 0 : 
-          a.isDecommissioned() ? 1 : -1;
+  public static class ServiceComparator implements Comparator<DatanodeInfo> {
+    @Override
+    public int compare(DatanodeInfo a, DatanodeInfo b) {
+      // Decommissioned nodes will still be moved to the end of the list
+      if (a.isDecommissioned()) {
+        return b.isDecommissioned() ? 0 : 1;
+      } else if (b.isDecommissioned()) {
+        return -1;
       }
-    };
 
+      // ENTERING_MAINTENANCE nodes should be after live nodes.
+      if (a.isEnteringMaintenance()) {
+        return b.isEnteringMaintenance() ? 0 : 1;
+      } else if (b.isEnteringMaintenance()) {
+        return -1;
+      } else {
+        return 0;
+      }
+    }
+  }
 
   /**
-   * Comparator for sorting DataNodeInfo[] based on decommissioned/stale 
states.
-   * Decommissioned/stale nodes are moved to the end of the array on sorting
-   * with this comparator.
-   */ 
+   * Comparator for sorting DataNodeInfo[] based on
+   * stale, decommissioned and entering_maintenance states.
+   * Order: live -> stale -> entering_maintenance -> decommissioned
+   */
   @InterfaceAudience.Private 
-  public static class DecomStaleComparator implements Comparator<DatanodeInfo> 
{
+  public static class ServiceAndStaleComparator extends ServiceComparator {
     private final long staleInterval;
 
     /**
-     * Constructor of DecomStaleComparator
+     * Constructor of ServiceAndStaleComparator
      * 
      * @param interval
      *          The time interval for marking datanodes as stale is passed from
      *          outside, since the interval may be changed dynamically
      */
-    public DecomStaleComparator(long interval) {
+    public ServiceAndStaleComparator(long interval) {
       this.staleInterval = interval;
     }
 
     @Override
     public int compare(DatanodeInfo a, DatanodeInfo b) {
-      // Decommissioned nodes will still be moved to the end of the list
-      if (a.isDecommissioned()) {
-        return b.isDecommissioned() ? 0 : 1;
-      } else if (b.isDecommissioned()) {
-        return -1;
+      int ret = super.compare(a, b);
+      if (ret != 0) {
+        return ret;
       }
+
       // Stale nodes will be moved behind the normal nodes
       boolean aStale = a.isStale(staleInterval);
       boolean bStale = b.isStale(staleInterval);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index aea0ae4..e5c5e53 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -989,20 +989,17 @@ public class Dispatcher {
   }
 
   private boolean shouldIgnore(DatanodeInfo dn) {
-    // ignore decommissioned nodes
-    final boolean decommissioned = dn.isDecommissioned();
-    // ignore decommissioning nodes
-    final boolean decommissioning = dn.isDecommissionInProgress();
+    // ignore out-of-service nodes
+    final boolean outOfService = !dn.isInService();
     // ignore nodes in exclude list
     final boolean excluded = Util.isExcluded(excludedNodes, dn);
     // ignore nodes not in the include list (if include list is not empty)
     final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
 
-    if (decommissioned || decommissioning || excluded || notIncluded) {
+    if (outOfService || excluded || notIncluded) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Excluding datanode " + dn
-            + ": decommissioned=" + decommissioned
-            + ", decommissioning=" + decommissioning
+            + ": outOfService=" + outOfService
             + ", excluded=" + excluded
             + ", notIncluded=" + notIncluded);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7b13add..03bdb7a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -126,6 +126,29 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
+ * For block state management, it tries to maintain the  safety
+ * property of "# of live replicas == # of expected redundancy" under
+ * any events such as decommission, namenode failover, datanode failure.
+ *
+ * The motivation of maintenance mode is to allow admins quickly repair nodes
+ * without paying the cost of decommission. Thus with maintenance mode,
+ * # of live replicas doesn't have to be equal to # of expected redundancy.
+ * If any of the replica is in maintenance mode, the safety property
+ * is extended as follows. These property still apply for the case of zero
+ * maintenance replicas, thus we can use these safe property for all scenarios.
+ * a. # of live replicas >= # of min replication for maintenance.
+ * b. # of live replicas <= # of expected redundancy.
+ * c. # of live replicas and maintenance replicas >= # of expected redundancy.
+ *
+ * For regular replication, # of min live replicas for maintenance is 
determined
+ * by DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY. This number has to <=
+ * DFS_NAMENODE_REPLICATION_MIN_KEY.
+ * For erasure encoding, # of min live replicas for maintenance is
+ * BlockInfoStriped#getRealDataBlockNum.
+ *
+ * Another safety property is to satisfy the block placement policy. While the
+ * policy is configurable, the replicas the policy is applied to are the live
+ * replicas + maintenance replicas.
  */
 @InterfaceAudience.Private
 public class BlockManager implements BlockStatsMXBean {
@@ -341,6 +364,11 @@ public class BlockManager implements BlockStatsMXBean {
 
   private final BlockIdManager blockIdManager;
 
+  /** Minimum live replicas needed for the datanode to be transitioned
+   * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
+   */
+  private final short minReplicationToBeInMaintenance;
+
   public BlockManager(final Namesystem namesystem, boolean haEnabled,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -373,13 +401,13 @@ public class BlockManager implements BlockStatsMXBean {
     this.maxCorruptFilesReturned = conf.getInt(
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
-    this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
-                                          
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+    this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+        DFSConfigKeys.DFS_REPLICATION_DEFAULT);
 
-    final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
-                                 DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
+    final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
+        DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
     final int minR = 
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
-                                 
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
     if (minR <= 0)
       throw new IOException("Unexpected configuration parameters: "
           + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
@@ -407,7 +435,7 @@ public class BlockManager implements BlockStatsMXBean {
     this.blocksInvalidateWorkPct = 
DFSUtil.getInvalidateWorkPctPerIteration(conf);
     this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
 
-    this.replicationRecheckInterval = 
+    this.replicationRecheckInterval =
       conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
           DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
           TimeUnit.SECONDS) * 1000L;
@@ -428,7 +456,7 @@ public class BlockManager implements BlockStatsMXBean {
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-    
+
     this.maxNumBlocksToLog =
         conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
             DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -438,6 +466,25 @@ public class BlockManager implements BlockStatsMXBean {
     this.getBlocksMinBlockSize = conf.getLongBytes(
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
+
+    final int minMaintenanceR = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
+
+    if (minMaintenanceR < 0) {
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+          + " = " + minMaintenanceR + " < 0");
+    }
+    if (minMaintenanceR > minR) {
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+          + " = " + minMaintenanceR + " > "
+          + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+          + " = " + minR);
+    }
+    this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
+
     this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
 
     bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -668,7 +715,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Dump all datanodes
     getDatanodeManager().datanodeDump(out);
   }
-  
+
   /**
    * Dump the metadata for the given block in a human-readable
    * form.
@@ -697,12 +744,12 @@ public class BlockManager implements BlockStatsMXBean {
       out.print(fileName + ": ");
     }
     // l: == live:, d: == decommissioned c: == corrupt e: == excess
-    out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
+    out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
               " (replicas:" +
               " l: " + numReplicas.liveReplicas() +
               " d: " + numReplicas.decommissionedAndDecommissioning() +
               " c: " + numReplicas.corruptReplicas() +
-              " e: " + numReplicas.excessReplicas() + ") "); 
+              " e: " + numReplicas.excessReplicas() + ") ");
 
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
@@ -750,6 +797,18 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  public short getMinReplicationToBeInMaintenance() {
+    return minReplicationToBeInMaintenance;
+  }
+
+  private short getMinMaintenanceStorageNum(BlockInfo block) {
+    if (block.isStriped()) {
+      return ((BlockInfoStriped) block).getRealDataBlockNum();
+    } else {
+      return minReplicationToBeInMaintenance;
+    }
+  }
+
   public boolean hasMinStorage(BlockInfo block) {
     return countNodes(block).liveReplicas() >= getMinStorageNum(block);
   }
@@ -942,7 +1001,7 @@ public class BlockManager implements BlockStatsMXBean {
     NumberReplicas replicas = countNodes(lastBlock);
     neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
         replicas.readOnlyReplicas(),
-        replicas.decommissionedAndDecommissioning(), getRedundancy(lastBlock));
+        replicas.outOfServiceReplicas(), getExpectedRedundancyNum(lastBlock));
     pendingReconstruction.remove(lastBlock);
 
     // remove this block from the list of pending blocks to be deleted. 
@@ -1078,7 +1137,8 @@ public class BlockManager implements BlockStatsMXBean {
     } else {
       isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes;
     }
-    final int numMachines = isCorrupt ? numNodes: numNodes - 
numCorruptReplicas;
+    int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
+    numMachines -= numReplicas.maintenanceNotForReadReplicas();
     DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
     final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
     int j = 0, i = 0;
@@ -1086,11 +1146,17 @@ public class BlockManager implements BlockStatsMXBean {
       final boolean noCorrupt = (numCorruptReplicas == 0);
       for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
         if (storage.getState() != State.FAILED) {
+          final DatanodeDescriptor d = storage.getDatanodeDescriptor();
+          // Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states.
+          if (d.isInMaintenance()
+              || (d.isEnteringMaintenance() && !d.isAlive())) {
+            continue;
+          }
+
           if (noCorrupt) {
             machines[j++] = storage;
             i = setBlockIndices(blk, blockIndices, i, storage);
           } else {
-            final DatanodeDescriptor d = storage.getDatanodeDescriptor();
             final boolean replicaCorrupt = isReplicaCorrupt(blk, d);
             if (isCorrupt || !replicaCorrupt) {
               machines[j++] = storage;
@@ -1106,7 +1172,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     assert j == machines.length :
-      "isCorrupt: " + isCorrupt + 
+      "isCorrupt: " + isCorrupt +
       " numMachines: " + numMachines +
       " numNodes: " + numNodes +
       " numCorrupt: " + numCorruptNodes +
@@ -1700,8 +1766,11 @@ public class BlockManager implements BlockStatsMXBean {
     return scheduledWork;
   }
 
+  // Check if the number of live + pending replicas satisfies
+  // the expected redundancy.
   boolean hasEnoughEffectiveReplicas(BlockInfo block,
-      NumberReplicas numReplicas, int pendingReplicaNum, int required) {
+      NumberReplicas numReplicas, int pendingReplicaNum) {
+    int required = getExpectedLiveRedundancyNum(block, numReplicas);
     int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
     return (numEffectiveReplicas >= required) &&
         (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
@@ -1716,8 +1785,6 @@ public class BlockManager implements BlockStatsMXBean {
       return null;
     }
 
-    short requiredRedundancy = getExpectedRedundancyNum(block);
-
     // get a source data-node
     List<DatanodeDescriptor> containingNodes = new ArrayList<>();
     List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
@@ -1726,6 +1793,8 @@ public class BlockManager implements BlockStatsMXBean {
     final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
         containingNodes, liveReplicaNodes, numReplicas,
         liveBlockIndices, priority);
+    short requiredRedundancy = getExpectedLiveRedundancyNum(block,
+        numReplicas);
     if(srcNodes == null || srcNodes.length == 0) {
       // block can not be reconstructed from any node
       LOG.debug("Block " + block + " cannot be reconstructed " +
@@ -1738,8 +1807,7 @@ public class BlockManager implements BlockStatsMXBean {
     assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
 
     int pendingNum = pendingReconstruction.getNumReplicas(block);
-    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
-        requiredRedundancy)) {
+    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
       neededReconstruction.remove(block, priority);
       blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
           " it has enough replicas", block);
@@ -1763,9 +1831,11 @@ public class BlockManager implements BlockStatsMXBean {
 
       // should reconstruct all the internal blocks before scheduling
       // replication task for decommissioning node(s).
-      if (additionalReplRequired - numReplicas.decommissioning() > 0) {
-        additionalReplRequired = additionalReplRequired
-            - numReplicas.decommissioning();
+      if (additionalReplRequired - numReplicas.decommissioning() -
+          numReplicas.liveEnteringMaintenanceReplicas() > 0) {
+        additionalReplRequired = additionalReplRequired -
+            numReplicas.decommissioning() -
+            numReplicas.liveEnteringMaintenanceReplicas();
       }
       byte[] indices = new byte[liveBlockIndices.size()];
       for (int i = 0 ; i < liveBlockIndices.size(); i++) {
@@ -1807,11 +1877,11 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // do not schedule more if enough replicas is already pending
-    final short requiredRedundancy = getExpectedRedundancyNum(block);
     NumberReplicas numReplicas = countNodes(block);
+    final short requiredRedundancy =
+        getExpectedLiveRedundancyNum(block, numReplicas);
     final int pendingNum = pendingReconstruction.getNumReplicas(block);
-    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
-        requiredRedundancy)) {
+    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
       neededReconstruction.remove(block, priority);
       rw.resetTargets();
       blockLog.debug("BLOCK* Removing {} from neededReplications as" +
@@ -1880,7 +1950,7 @@ public class BlockManager implements BlockStatsMXBean {
    * @throws IOException
    *           if the number of targets < minimum replication.
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
-   *      Set, long, List, BlockStoragePolicy)
+   *      Set, long, List, BlockStoragePolicy, EnumSet)
    */
   public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
       final int numOfReplicas, final Node client,
@@ -1987,13 +2057,15 @@ public class BlockManager implements BlockStatsMXBean {
         continue;
       }
 
-      // never use already decommissioned nodes or unknown state replicas
-      if (state == null || state == StoredReplicaState.DECOMMISSIONED) {
+      // never use already decommissioned nodes, maintenance node not
+      // suitable for read or unknown state replicas.
+      if (state == null || state == StoredReplicaState.DECOMMISSIONED
+          || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
         continue;
       }
 
       if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
-          && !node.isDecommissionInProgress() 
+          && (!node.isDecommissionInProgress() && 
!node.isEnteringMaintenance())
           && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
         continue; // already reached replication limit
       }
@@ -2045,10 +2117,10 @@ public class BlockManager implements BlockStatsMXBean {
             continue;
           }
           NumberReplicas num = countNodes(timedOutItems[i]);
-          if (isNeededReconstruction(bi, num.liveReplicas())) {
+          if (isNeededReconstruction(bi, num)) {
             neededReconstruction.add(bi, num.liveReplicas(),
-                num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
-                getRedundancy(bi));
+                num.readOnlyReplicas(), num.outOfServiceReplicas(),
+                getExpectedRedundancyNum(bi));
           }
         }
       } finally {
@@ -3014,10 +3086,9 @@ public class BlockManager implements BlockStatsMXBean {
 
     // handle low redundancy/extra redundancy
     short fileRedundancy = getExpectedRedundancyNum(storedBlock);
-    if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
+    if (!isNeededReconstruction(storedBlock, num, pendingNum)) {
       neededReconstruction.remove(storedBlock, numCurrentReplica,
-          num.readOnlyReplicas(),
-          num.decommissionedAndDecommissioning(), fileRedundancy);
+          num.readOnlyReplicas(), num.outOfServiceReplicas(), fileRedundancy);
     } else {
       updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
     }
@@ -3040,6 +3111,10 @@ public class BlockManager implements BlockStatsMXBean {
     return storedBlock;
   }
 
+  // If there is any maintenance replica, we don't have to restore
+  // the condition of live + maintenance == expected. We allow
+  // live + maintenance >= expected. The extra redundancy will be removed
+  // when the maintenance node changes to live.
   private boolean shouldProcessExtraRedundancy(NumberReplicas num,
       int expectedNum) {
     final int numCurrent = num.liveReplicas();
@@ -3255,9 +3330,9 @@ public class BlockManager implements BlockStatsMXBean {
     NumberReplicas num = countNodes(block);
     final int numCurrentReplica = num.liveReplicas();
     // add to low redundancy queue if need to be
-    if (isNeededReconstruction(block, numCurrentReplica)) {
+    if (isNeededReconstruction(block, num)) {
       if (neededReconstruction.add(block, numCurrentReplica,
-          num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
+          num.readOnlyReplicas(), num.outOfServiceReplicas(),
           expectedRedundancy)) {
         return MisReplicationResult.UNDER_REPLICATED;
       }
@@ -3290,9 +3365,9 @@ public class BlockManager implements BlockStatsMXBean {
 
     // update neededReconstruction priority queues
     b.setReplication(newRepl);
+    NumberReplicas num = countNodes(b);
     updateNeededReconstructions(b, 0, newRepl - oldRepl);
-
-    if (oldRepl > newRepl) {
+    if (shouldProcessExtraRedundancy(num, newRepl)) {
       processExtraRedundancyBlock(b, newRepl, null, null);
     }
   }
@@ -3318,14 +3393,14 @@ public class BlockManager implements BlockStatsMXBean {
       }
       final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
       if (storage.areBlockContentsStale()) {
-        LOG.trace("BLOCK* processOverReplicatedBlock: Postponing {}"
+        LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}"
             + " since storage {} does not yet have up-to-date information.",
             block, storage);
         postponeBlock(block);
         return;
       }
       if (!isExcess(cur, block)) {
-        if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if (cur.isInService()) {
           // exclude corrupt replicas
           if (corruptNodes == null || !corruptNodes.contains(cur)) {
             nonExcess.add(storage);
@@ -3766,7 +3841,7 @@ public class BlockManager implements BlockStatsMXBean {
     return countNodes(b, false);
   }
 
-  private NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
+  NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
     NumberReplicas numberReplicas = new NumberReplicas();
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     if (b.isStriped()) {
@@ -3797,6 +3872,12 @@ public class BlockManager implements BlockStatsMXBean {
         s = StoredReplicaState.DECOMMISSIONING;
       } else if (node.isDecommissioned()) {
         s = StoredReplicaState.DECOMMISSIONED;
+      } else if (node.isMaintenance()) {
+        if (node.isInMaintenance() || !node.isAlive()) {
+          s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
+        } else {
+          s = StoredReplicaState.MAINTENANCE_FOR_READ;
+        }
       } else if (isExcess(node, b)) {
         s = StoredReplicaState.EXCESS;
       } else {
@@ -3868,11 +3949,11 @@ public class BlockManager implements BlockStatsMXBean {
   }
   
   /**
-   * On stopping decommission, check if the node has excess replicas.
+   * On putting the node in service, check if the node has excess replicas.
    * If there are any excess replicas, call processExtraRedundancyBlock().
    * Process extra redundancy blocks only when active NN is out of safe mode.
    */
-  void processExtraRedundancyBlocksOnReCommission(
+  void processExtraRedundancyBlocksOnInService(
       final DatanodeDescriptor srcNode) {
     if (!isPopulatingReplQueues()) {
       return;
@@ -3881,7 +3962,7 @@ public class BlockManager implements BlockStatsMXBean {
     int numExtraRedundancy = 0;
     while(it.hasNext()) {
       final BlockInfo block = it.next();
-      int expectedReplication = this.getRedundancy(block);
+      int expectedReplication = this.getExpectedRedundancyNum(block);
       NumberReplicas num = countNodes(block);
       if (shouldProcessExtraRedundancy(num, expectedReplication)) {
         // extra redundancy block
@@ -3891,14 +3972,15 @@ public class BlockManager implements BlockStatsMXBean {
       }
     }
     LOG.info("Invalidated " + numExtraRedundancy
-        + " extra redundancy blocks on " + srcNode + " during 
recommissioning");
+        + " extra redundancy blocks on " + srcNode + " after it is in 
service");
   }
 
   /**
-   * Returns whether a node can be safely decommissioned based on its 
-   * liveness. Dead nodes cannot always be safely decommissioned.
+   * Returns whether a node can be safely decommissioned or in maintenance
+   * based on its liveness. Dead nodes cannot always be safely decommissioned
+   * or in maintenance.
    */
-  boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
+  boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) {
     if (!node.checkBlockReportReceived()) {
       LOG.info("Node {} hasn't sent its first block report.", node);
       return false;
@@ -3912,17 +3994,18 @@ public class BlockManager implements BlockStatsMXBean {
     if (pendingReconstructionBlocksCount == 0 &&
         lowRedundancyBlocksCount == 0) {
       LOG.info("Node {} is dead and there are no low redundancy" +
-          " blocks or blocks pending reconstruction. Safe to decommission.",
-          node);
+          " blocks or blocks pending reconstruction. Safe to decommission or",
+          " put in maintenance.", node);
       return true;
     }
 
     LOG.warn("Node {} is dead " +
-        "while decommission is in progress. Cannot be safely " +
-        "decommissioned since there is risk of reduced " +
-        "data durability or data loss. Either restart the failed node or" +
-        " force decommissioning by removing, calling refreshNodes, " +
-        "then re-adding to the excludes files.", node);
+        "while in {}. Cannot be safely " +
+        "decommissioned or be in maintenance since there is risk of reduced " +
+        "data durability or data loss. Either restart the failed node or " +
+        "force decommissioning or maintenance by removing, calling " +
+        "refreshNodes, then re-adding to the excludes or host config files.",
+        node, node.getAdminState());
     return false;
   }
 
@@ -3990,17 +4073,16 @@ public class BlockManager implements BlockStatsMXBean {
       }
       NumberReplicas repl = countNodes(block);
       int pendingNum = pendingReconstruction.getNumReplicas(block);
-      int curExpectedReplicas = getRedundancy(block);
-      if (!hasEnoughEffectiveReplicas(block, repl, pendingNum,
-          curExpectedReplicas)) {
+      int curExpectedReplicas = getExpectedRedundancyNum(block);
+      if (!hasEnoughEffectiveReplicas(block, repl, pendingNum)) {
         neededReconstruction.update(block, repl.liveReplicas() + pendingNum,
-            repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
+            repl.readOnlyReplicas(), repl.outOfServiceReplicas(),
             curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
       } else {
         int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
         int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
         neededReconstruction.remove(block, oldReplicas, 
repl.readOnlyReplicas(),
-            repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
+            repl.outOfServiceReplicas(), oldExpectedReplicas);
       }
     } finally {
       namesystem.writeUnlock();
@@ -4018,24 +4100,15 @@ public class BlockManager implements BlockStatsMXBean {
       short expected = getExpectedRedundancyNum(block);
       final NumberReplicas n = countNodes(block);
       final int pending = pendingReconstruction.getNumReplicas(block);
-      if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
+      if (!hasEnoughEffectiveReplicas(block, n, pending)) {
         neededReconstruction.add(block, n.liveReplicas() + pending,
-            n.readOnlyReplicas(),
-            n.decommissionedAndDecommissioning(), expected);
+            n.readOnlyReplicas(), n.outOfServiceReplicas(), expected);
       } else if (shouldProcessExtraRedundancy(n, expected)) {
         processExtraRedundancyBlock(block, expected, null, null);
       }
     }
   }
 
-  /** 
-   * @return 0 if the block is not found;
-   *         otherwise, return the replication factor of the block.
-   */
-  private int getRedundancy(BlockInfo block) {
-    return getExpectedRedundancyNum(block);
-  }
-
   /**
    * Get blocks to invalidate for <i>nodeId</i>
    * in {@link #invalidateBlocks}.
@@ -4088,6 +4161,8 @@ public class BlockManager implements BlockStatsMXBean {
         .getNodes(storedBlock);
     for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
       final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+      // Nodes under maintenance should be counted as valid replicas from
+      // rack policy point of view.
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
           && ((corruptNodes == null) || !corruptNodes.contains(cur))) {
         liveNodes.add(cur);
@@ -4102,14 +4177,36 @@ public class BlockManager implements BlockStatsMXBean {
         .isPlacementPolicySatisfied();
   }
 
+  boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock,
+      NumberReplicas numberReplicas) {
+    return storedBlock.isComplete() && (numberReplicas.liveReplicas() <
+        getMinMaintenanceStorageNum(storedBlock) ||
+        !isPlacementPolicySatisfied(storedBlock));
+  }
+
+  boolean isNeededReconstruction(BlockInfo storedBlock,
+      NumberReplicas numberReplicas) {
+    return isNeededReconstruction(storedBlock, numberReplicas, 0);
+  }
+
   /**
    * A block needs reconstruction if the number of redundancies is less than
    * expected or if it does not have enough racks.
    */
-  boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
-    int expected = getExpectedRedundancyNum(storedBlock);
-    return storedBlock.isComplete()
-        && (current < expected || !isPlacementPolicySatisfied(storedBlock));
+  boolean isNeededReconstruction(BlockInfo storedBlock,
+      NumberReplicas numberReplicas, int pending) {
+    return storedBlock.isComplete() &&
+        !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
+  }
+
+  // Exclude maintenance, but make sure it has minimal live replicas
+  // to satisfy the maintenance requirement.
+  public short getExpectedLiveRedundancyNum(BlockInfo block,
+      NumberReplicas numberReplicas) {
+    final short expectedRedundancy = getExpectedRedundancyNum(block);
+    return (short)Math.max(expectedRedundancy -
+        numberReplicas.maintenanceReplicas(),
+        getMinMaintenanceStorageNum(block));
   }
 
   public short getExpectedRedundancyNum(BlockInfo block) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 3958c73..0390546 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -833,8 +833,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                          List<DatanodeStorageInfo> results,
                          boolean avoidStaleNodes) {
     // check if the node is (being) decommissioned
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      logNodeIsNotChosen(node, "the node is (being) decommissioned ");
+    if (!node.isInService()) {
+      logNodeIsNotChosen(node, "the node isn't in service.");
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index ca8d72a..8563cf3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -682,7 +682,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
       if (datanode == null) {
         continue;
       }
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+      if (!datanode.isInService()) {
         continue;
       }
       if (corrupt != null && corrupt.contains(datanode)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 6d163ec..f7da52a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -146,8 +146,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   // Stores status of decommissioning.
   // If node is not decommissioning, do not use this object for anything.
-  public final DecommissioningStatus decommissioningStatus =
-      new DecommissioningStatus();
+  private final LeavingServiceStatus leavingServiceStatus =
+      new LeavingServiceStatus();
 
   private final Map<String, DatanodeStorageInfo> storageMap =
       new HashMap<>();
@@ -276,6 +276,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.needKeyUpdate = needKeyUpdate;
   }
 
+  public LeavingServiceStatus getLeavingServiceStatus() {
+    return leavingServiceStatus;
+  }
+
   @VisibleForTesting
   public DatanodeStorageInfo getStorageInfo(String storageID) {
     synchronized (storageMap) {
@@ -729,51 +733,54 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return (this == obj) || super.equals(obj);
   }
 
-  /** Decommissioning status */
-  public class DecommissioningStatus {
+  /** Leaving service status. */
+  public class LeavingServiceStatus {
     private int underReplicatedBlocks;
-    private int decommissionOnlyReplicas;
+    private int outOfServiceOnlyReplicas;
     private int underReplicatedInOpenFiles;
     private long startTime;
     
     synchronized void set(int underRep,
         int onlyRep, int underConstruction) {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return;
       }
       underReplicatedBlocks = underRep;
-      decommissionOnlyReplicas = onlyRep;
+      outOfServiceOnlyReplicas = onlyRep;
       underReplicatedInOpenFiles = underConstruction;
     }
 
     /** @return the number of under-replicated blocks */
     public synchronized int getUnderReplicatedBlocks() {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
       return underReplicatedBlocks;
     }
-    /** @return the number of decommission-only replicas */
-    public synchronized int getDecommissionOnlyReplicas() {
-      if (!isDecommissionInProgress()) {
+    /** @return the number of blocks with out-of-service-only replicas */
+    public synchronized int getOutOfServiceOnlyReplicas() {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
-      return decommissionOnlyReplicas;
+      return outOfServiceOnlyReplicas;
     }
     /** @return the number of under-replicated blocks in open files */
     public synchronized int getUnderReplicatedInOpenFiles() {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
       return underReplicatedInOpenFiles;
     }
     /** Set start time */
     public synchronized void setStartTime(long time) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
+        return;
+      }
       startTime = time;
     }
     /** @return start time */
     public synchronized long getStartTime() {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
       return startTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 2d6547f..1a47835 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -388,8 +388,8 @@ public class DatanodeManager {
   public void sortLocatedBlocks(final String targetHost,
       final List<LocatedBlock> locatedBlocks) {
     Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
-        new DFSUtil.DecomStaleComparator(staleInterval) :
-        DFSUtil.DECOM_COMPARATOR;
+        new DFSUtil.ServiceAndStaleComparator(staleInterval) :
+        new DFSUtil.ServiceComparator();
     // sort located block
     for (LocatedBlock lb : locatedBlocks) {
       if (lb.isStriped()) {
@@ -632,9 +632,20 @@ public class DatanodeManager {
    * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
+    removeDatanode(nodeInfo, true);
+  }
+
+  /**
+   * Remove a datanode descriptor.
+   * @param nodeInfo datanode descriptor.
+   */
+  private void removeDatanode(DatanodeDescriptor nodeInfo,
+      boolean removeBlocksFromBlocksMap) {
     assert namesystem.hasWriteLock();
     heartbeatManager.removeDatanode(nodeInfo);
-    blockManager.removeBlocksAssociatedTo(nodeInfo);
+    if (removeBlocksFromBlocksMap) {
+      blockManager.removeBlocksAssociatedTo(nodeInfo);
+    }
     networktopology.remove(nodeInfo);
     decrementVersionCount(nodeInfo.getSoftwareVersion());
     blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
@@ -655,7 +666,7 @@ public class DatanodeManager {
     try {
       final DatanodeDescriptor descriptor = getDatanode(node);
       if (descriptor != null) {
-        removeDatanode(descriptor);
+        removeDatanode(descriptor, true);
       } else {
         NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
                                      + node + " does not exist");
@@ -666,7 +677,8 @@ public class DatanodeManager {
   }
 
   /** Remove a dead datanode. */
-  void removeDeadDatanode(final DatanodeID nodeID) {
+  void removeDeadDatanode(final DatanodeID nodeID,
+      boolean removeBlocksFromBlockMap) {
     DatanodeDescriptor d;
     try {
       d = getDatanode(nodeID);
@@ -675,8 +687,9 @@ public class DatanodeManager {
     }
     if (d != null && isDatanodeDead(d)) {
       NameNode.stateChangeLog.info(
-          "BLOCK* removeDeadDatanode: lost heartbeat from " + d);
-      removeDatanode(d);
+          "BLOCK* removeDeadDatanode: lost heartbeat from " + d
+              + ", removeBlocksFromBlockMap " + removeBlocksFromBlockMap);
+      removeDatanode(d, removeBlocksFromBlockMap);
     }
   }
 
@@ -1112,10 +1125,16 @@ public class DatanodeManager {
   }
   
   /**
-   * 1. Added to hosts  --> no further work needed here.
-   * 2. Removed from hosts --> mark AdminState as decommissioned. 
-   * 3. Added to exclude --> start decommission.
-   * 4. Removed from exclude --> stop decommission.
+   * Reload datanode membership and the desired admin operations from
+   * host files. If a node isn't allowed, hostConfigManager.isIncluded returns
+   * false and the node can't be used.
+   * If a node is allowed and the desired admin operation is defined,
+   * it will transition to the desired admin state.
+   * If a node is allowed and upgrade domain is defined,
+   * the upgrade domain will be set on the node.
+   * To use maintenance mode or upgrade domain, set
+   * DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY to
+   * CombinedHostFileManager.class.
    */
   private void refreshDatanodes() {
     final Map<String, DatanodeDescriptor> copy;
@@ -1125,17 +1144,17 @@ public class DatanodeManager {
     for (DatanodeDescriptor node : copy.values()) {
       // Check if not include.
       if (!hostConfigManager.isIncluded(node)) {
-        node.setDisallowed(true); // case 2.
+        node.setDisallowed(true);
       } else {
         long maintenanceExpireTimeInMS =
             hostConfigManager.getMaintenanceExpirationTimeInMS(node);
         if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
           decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
         } else if (hostConfigManager.isExcluded(node)) {
-          decomManager.startDecommission(node); // case 3.
+          decomManager.startDecommission(node);
         } else {
           decomManager.stopMaintenance(node);
-          decomManager.stopDecommission(node); // case 4.
+          decomManager.stopDecommission(node);
         }
       }
       node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 87b36da..b1cfd78 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -201,7 +201,7 @@ public class DecommissionManager {
           LOG.info("Starting decommission of {} {} with {} blocks",
               node, storage, storage.numBlocks());
         }
-        node.decommissioningStatus.setStartTime(monotonicNow());
+        node.getLeavingServiceStatus().setStartTime(monotonicNow());
         pendingNodes.add(node);
       }
     } else {
@@ -222,7 +222,7 @@ public class DecommissionManager {
       // extra redundancy blocks will be detected and processed when
       // the dead node comes back and send in its full block report.
       if (node.isAlive()) {
-        blockManager.processExtraRedundancyBlocksOnReCommission(node);
+        blockManager.processExtraRedundancyBlocksOnInService(node);
       }
       // Remove from tracking in DecommissionManager
       pendingNodes.remove(node);
@@ -246,6 +246,16 @@ public class DecommissionManager {
     if (!node.isMaintenance()) {
       // Update DN stats maintained by HeartbeatManager
       hbManager.startMaintenance(node);
+      // hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
+      if (node.isEnteringMaintenance()) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          LOG.info("Starting maintenance of {} {} with {} blocks",
+              node, storage, storage.numBlocks());
+        }
+        node.getLeavingServiceStatus().setStartTime(monotonicNow());
+      }
+      // Track the node regardless whether it is ENTERING_MAINTENANCE or
+      // IN_MAINTENANCE to support maintenance expiration.
       pendingNodes.add(node);
     } else {
       LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
@@ -264,8 +274,34 @@ public class DecommissionManager {
       // Update DN stats maintained by HeartbeatManager
       hbManager.stopMaintenance(node);
 
-      // TODO HDFS-9390 remove replicas from block maps
-      // or handle over replicated blocks.
+      // extra redundancy blocks will be detected and processed when
+      // the dead node comes back and send in its full block report.
+      if (!node.isAlive()) {
+        // The node became dead when it was in maintenance, at which point
+        // the replicas weren't removed from block maps.
+        // When the node leaves maintenance, the replicas should be removed
+        // from the block maps to trigger the necessary replication to
+        // maintain the safety property of "# of live replicas + maintenance
+        // replicas" >= the expected redundancy.
+        blockManager.removeBlocksAssociatedTo(node);
+      } else {
+        // Even though putting nodes in maintenance node doesn't cause live
+        // replicas to match expected replication factor, it is still possible
+        // to have over replicated when the node leaves maintenance node.
+        // First scenario:
+        // a. Node became dead when it is at AdminStates.NORMAL, thus
+        //    block is replicated so that 3 replicas exist on other nodes.
+        // b. Admins put the dead node into maintenance mode and then
+        //    have the node rejoin the cluster.
+        // c. Take the node out of maintenance mode.
+        // Second scenario:
+        // a. With replication factor 3, set one replica to maintenance node,
+        //    thus block has 1 maintenance replica and 2 live replicas.
+        // b. Change the replication factor to 2. The block will still have
+        //    1 maintenance replica and 2 live replicas.
+        // c. Take the node out of maintenance mode.
+        blockManager.processExtraRedundancyBlocksOnInService(node);
+      }
 
       // Remove from tracking in DecommissionManager
       pendingNodes.remove(node);
@@ -281,6 +317,11 @@ public class DecommissionManager {
     LOG.info("Decommissioning complete for node {}", dn);
   }
 
+  private void setInMaintenance(DatanodeDescriptor dn) {
+    dn.setInMaintenance();
+    LOG.info("Node {} has entered maintenance mode.", dn);
+  }
+
   /**
    * Checks whether a block is sufficiently replicated/stored for
    * decommissioning. For replicated blocks or striped blocks, full-strength
@@ -288,20 +329,21 @@ public class DecommissionManager {
    * @return true if sufficient, else false.
    */
   private boolean isSufficient(BlockInfo block, BlockCollection bc,
-      NumberReplicas numberReplicas) {
-    final int numExpected = blockManager.getExpectedRedundancyNum(block);
-    final int numLive = numberReplicas.liveReplicas();
-    if (numLive >= numExpected
-        && blockManager.isPlacementPolicySatisfied(block)) {
+      NumberReplicas numberReplicas, boolean isDecommission) {
+    if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
       // Block has enough replica, skip
       LOG.trace("Block {} does not need replication.", block);
       return true;
     }
 
+    final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
+        numberReplicas);
+    final int numLive = numberReplicas.liveReplicas();
+
     // Block is under-replicated
-    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, 
+    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
         numLive);
-    if (numExpected > numLive) {
+    if (isDecommission && numExpected > numLive) {
       if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
         // Can decom a UC block as long as there will still be minReplicas
         if (blockManager.hasMinStorage(block, numLive)) {
@@ -346,11 +388,16 @@ public class DecommissionManager {
         + ", corrupt replicas: " + num.corruptReplicas()
         + ", decommissioned replicas: " + num.decommissioned()
         + ", decommissioning replicas: " + num.decommissioning()
+        + ", maintenance replicas: " + num.maintenanceReplicas()
+        + ", live entering maintenance replicas: "
+        + num.liveEnteringMaintenanceReplicas()
         + ", excess replicas: " + num.excessReplicas()
         + ", Is Open File: " + bc.isUnderConstruction()
         + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
         + srcNode + ", Is current datanode decommissioning: "
-        + srcNode.isDecommissionInProgress());
+        + srcNode.isDecommissionInProgress() +
+        ", Is current datanode entering maintenance: "
+        + srcNode.isEnteringMaintenance());
   }
 
   @VisibleForTesting
@@ -424,7 +471,7 @@ public class DecommissionManager {
       numBlocksChecked = 0;
       numBlocksCheckedPerLock = 0;
       numNodesChecked = 0;
-      // Check decom progress
+      // Check decommission or maintenance progress.
       namesystem.writeLock();
       try {
         processPendingNodes();
@@ -464,15 +511,14 @@ public class DecommissionManager {
         final DatanodeDescriptor dn = entry.getKey();
         AbstractList<BlockInfo> blocks = entry.getValue();
         boolean fullScan = false;
-        if (dn.isMaintenance()) {
-          // TODO HDFS-9390 make sure blocks are minimally replicated
-          // before transitioning the node to IN_MAINTENANCE state.
-
+        if (dn.isMaintenance() && dn.maintenanceExpired()) {
           // If maintenance expires, stop tracking it.
-          if (dn.maintenanceExpired()) {
-            stopMaintenance(dn);
-            toRemove.add(dn);
-          }
+          stopMaintenance(dn);
+          toRemove.add(dn);
+          continue;
+        }
+        if (dn.isInMaintenance()) {
+          // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
           continue;
         }
         if (blocks == null) {
@@ -487,7 +533,7 @@ public class DecommissionManager {
         } else {
           // This is a known datanode, check if its # of insufficiently 
           // replicated blocks has dropped to zero and if it can be decommed
-          LOG.debug("Processing decommission-in-progress node {}", dn);
+          LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
           pruneReliableBlocks(dn, blocks);
         }
         if (blocks.size() == 0) {
@@ -506,22 +552,31 @@ public class DecommissionManager {
           // If the full scan is clean AND the node liveness is okay, 
           // we can finally mark as decommissioned.
           final boolean isHealthy =
-              blockManager.isNodeHealthyForDecommission(dn);
+              blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
           if (blocks.size() == 0 && isHealthy) {
-            setDecommissioned(dn);
-            toRemove.add(dn);
+            if (dn.isDecommissionInProgress()) {
+              setDecommissioned(dn);
+              toRemove.add(dn);
+            } else if (dn.isEnteringMaintenance()) {
+              // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
+              // to track maintenance expiration.
+              setInMaintenance(dn);
+            } else {
+              Preconditions.checkState(false,
+                  "A node is in an invalid state!");
+            }
             LOG.debug("Node {} is sufficiently replicated and healthy, "
-                + "marked as decommissioned.", dn);
+                + "marked as {}.", dn.getAdminState());
           } else {
             LOG.debug("Node {} {} healthy."
                 + " It needs to replicate {} more blocks."
-                + " Decommissioning is still in progress.",
-                dn, isHealthy? "is": "isn't", blocks.size());
+                + " {} is still in progress.", dn,
+                isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
           }
         } else {
           LOG.debug("Node {} still has {} blocks to replicate "
-                  + "before it is a candidate to finish decommissioning.",
-              dn, blocks.size());
+              + "before it is a candidate to finish {}.",
+              dn, blocks.size(), dn.getAdminState());
         }
         iterkey = dn;
       }
@@ -539,7 +594,7 @@ public class DecommissionManager {
      */
     private void pruneReliableBlocks(final DatanodeDescriptor datanode,
         AbstractList<BlockInfo> blocks) {
-      processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
+      processBlocksInternal(datanode, blocks.iterator(), null, true);
     }
 
     /**
@@ -554,7 +609,7 @@ public class DecommissionManager {
     private AbstractList<BlockInfo> handleInsufficientlyStored(
         final DatanodeDescriptor datanode) {
       AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
-      processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
+      processBlocksInternal(datanode, datanode.getBlockIterator(),
           insufficient, false);
       return insufficient;
     }
@@ -573,14 +628,14 @@ public class DecommissionManager {
      * @param pruneReliableBlocks         whether to remove blocks reliable
      *                                    enough from the iterator
      */
-    private void processBlocksForDecomInternal(
+    private void processBlocksInternal(
         final DatanodeDescriptor datanode,
         final Iterator<BlockInfo> it,
         final List<BlockInfo> insufficientList,
         boolean pruneReliableBlocks) {
       boolean firstReplicationLog = true;
       int lowRedundancyBlocks = 0;
-      int decommissionOnlyReplicas = 0;
+      int outOfServiceOnlyReplicas = 0;
       int lowRedundancyInOpenFiles = 0;
       while (it.hasNext()) {
         if (insufficientList == null
@@ -626,21 +681,25 @@ public class DecommissionManager {
 
         // Schedule low redundancy blocks for reconstruction if not already
         // pending
-        if (blockManager.isNeededReconstruction(block, liveReplicas)) {
+        boolean isDecommission = datanode.isDecommissionInProgress();
+        boolean neededReconstruction = isDecommission ?
+            blockManager.isNeededReconstruction(block, num) :
+            blockManager.isNeededReconstructionForMaintenance(block, num);
+        if (neededReconstruction) {
           if (!blockManager.neededReconstruction.contains(block) &&
               blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
               blockManager.isPopulatingReplQueues()) {
             // Process these blocks only when active NN is out of safe mode.
             blockManager.neededReconstruction.add(block,
                 liveReplicas, num.readOnlyReplicas(),
-                num.decommissionedAndDecommissioning(),
+                num.outOfServiceReplicas(),
                 blockManager.getExpectedRedundancyNum(block));
           }
         }
 
         // Even if the block is without sufficient redundancy,
         // it doesn't block decommission if has sufficient redundancy
-        if (isSufficient(block, bc, num)) {
+        if (isSufficient(block, bc, num, isDecommission)) {
           if (pruneReliableBlocks) {
             it.remove();
           }
@@ -662,14 +721,13 @@ public class DecommissionManager {
         if (bc.isUnderConstruction()) {
           lowRedundancyInOpenFiles++;
         }
-        if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 
0)) {
-          decommissionOnlyReplicas++;
+        if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
+          outOfServiceOnlyReplicas++;
         }
       }
 
-      datanode.decommissioningStatus.set(lowRedundancyBlocks,
-          decommissionOnlyReplicas,
-          lowRedundancyInOpenFiles);
+      datanode.getLeavingServiceStatus().set(lowRedundancyBlocks,
+          outOfServiceOnlyReplicas, lowRedundancyInOpenFiles);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 082e949..0ae6f0f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -130,12 +130,14 @@ class ErasureCodingWork extends BlockReconstructionWork {
       // we only need to replicate one internal block to a new rack
       int sourceIndex = chooseSource4SimpleReplication();
       createReplicationWork(sourceIndex, targets[0]);
-    } else if (numberReplicas.decommissioning() > 0 && hasAllInternalBlocks()) 
{
-      List<Integer> decommissioningSources = findDecommissioningSources();
+    } else if ((numberReplicas.decommissioning() > 0 ||
+        numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
+        hasAllInternalBlocks()) {
+      List<Integer> leavingServiceSources = findLeavingServiceSources();
       // decommissioningSources.size() should be >= targets.length
-      final int num = Math.min(decommissioningSources.size(), targets.length);
+      final int num = Math.min(leavingServiceSources.size(), targets.length);
       for (int i = 0; i < num; i++) {
-        createReplicationWork(decommissioningSources.get(i), targets[i]);
+        createReplicationWork(leavingServiceSources.get(i), targets[i]);
       }
     } else {
       targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
@@ -162,10 +164,12 @@ class ErasureCodingWork extends BlockReconstructionWork {
     }
   }
 
-  private List<Integer> findDecommissioningSources() {
+  private List<Integer> findLeavingServiceSources() {
     List<Integer> srcIndices = new ArrayList<>();
     for (int i = 0; i < getSrcNodes().length; i++) {
-      if (getSrcNodes()[i].isDecommissionInProgress()) {
+      if (getSrcNodes()[i].isDecommissionInProgress() ||
+          (getSrcNodes()[i].isEnteringMaintenance() &&
+          getSrcNodes()[i].isAlive())) {
         srcIndices.add(i);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d728ee2..a72ad64 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -25,10 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
-import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.util.Daemon;
@@ -269,13 +266,19 @@ class HeartbeatManager implements DatanodeStatistics {
     if (!node.isAlive()) {
       LOG.info("Dead node {} is put in maintenance state immediately.", node);
       node.setInMaintenance();
-    } else if (node.isDecommissioned()) {
-      LOG.info("Decommissioned node " + node + " is put in maintenance state"
-          + " immediately.");
-      node.setInMaintenance();
     } else {
       stats.subtract(node);
-      node.startMaintenance();
+      if (node.isDecommissioned()) {
+        LOG.info("Decommissioned node " + node + " is put in maintenance state"
+            + " immediately.");
+        node.setInMaintenance();
+      } else if (blockManager.getMinReplicationToBeInMaintenance() == 0) {
+        LOG.info("MinReplicationToBeInMaintenance is set to zero. " + node +
+            " is put in maintenance state" + " immediately.");
+        node.setInMaintenance();
+      } else {
+        node.startMaintenance();
+      }
       stats.add(node);
     }
   }
@@ -352,7 +355,7 @@ class HeartbeatManager implements DatanodeStatistics {
     boolean allAlive = false;
     while (!allAlive) {
       // locate the first dead node.
-      DatanodeID dead = null;
+      DatanodeDescriptor dead = null;
 
       // locate the first failed storage that isn't on a dead node.
       DatanodeStorageInfo failedStorage = null;
@@ -401,7 +404,7 @@ class HeartbeatManager implements DatanodeStatistics {
         // acquire the fsnamesystem lock, and then remove the dead node.
         namesystem.writeLock();
         try {
-          dm.removeDeadDatanode(dead);
+          dm.removeDeadDatanode(dead, !dead.isMaintenance());
         } finally {
           namesystem.writeUnlock();
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
index de8cf4e..3a26f4a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
@@ -155,7 +155,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
   private int getPriority(BlockInfo block,
                           int curReplicas,
                           int readOnlyReplicas,
-                          int decommissionedReplicas,
+                          int outOfServiceReplicas,
                           int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
     if (curReplicas >= expectedReplicas) {
@@ -164,20 +164,20 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
     }
     if (block.isStriped()) {
       BlockInfoStriped sblk = (BlockInfoStriped) block;
-      return getPriorityStriped(curReplicas, decommissionedReplicas,
+      return getPriorityStriped(curReplicas, outOfServiceReplicas,
           sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
     } else {
       return getPriorityContiguous(curReplicas, readOnlyReplicas,
-          decommissionedReplicas, expectedReplicas);
+          outOfServiceReplicas, expectedReplicas);
     }
   }
 
   private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
-      int decommissionedReplicas, int expectedReplicas) {
+      int outOfServiceReplicas, int expectedReplicas) {
     if (curReplicas == 0) {
       // If there are zero non-decommissioned replicas but there are
-      // some decommissioned replicas, then assign them highest priority
-      if (decommissionedReplicas > 0) {
+      // some out of service replicas, then assign them highest priority
+      if (outOfServiceReplicas > 0) {
         return QUEUE_HIGHEST_PRIORITY;
       }
       if (readOnlyReplicas > 0) {
@@ -201,11 +201,11 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
     }
   }
 
-  private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+  private int getPriorityStriped(int curReplicas, int outOfServiceReplicas,
       short dataBlkNum, short parityBlkNum) {
     if (curReplicas < dataBlkNum) {
       // There are some replicas on decommissioned nodes so it's not corrupted
-      if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+      if (curReplicas + outOfServiceReplicas >= dataBlkNum) {
         return QUEUE_HIGHEST_PRIORITY;
       }
       return QUEUE_WITH_CORRUPT_BLOCKS;
@@ -227,18 +227,15 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
    *
    * @param block a low redundancy block
    * @param curReplicas current number of replicas of the block
-   * @param decomissionedReplicas the number of decommissioned replicas
+   * @param outOfServiceReplicas the number of out-of-service replicas
    * @param expectedReplicas expected number of replicas of the block
    * @return true if the block was added to a queue.
    */
   synchronized boolean add(BlockInfo block,
-                           int curReplicas,
-                           int readOnlyReplicas,
-                           int decomissionedReplicas,
-                           int expectedReplicas) {
-    assert curReplicas >= 0 : "Negative replicas!";
+      int curReplicas, int readOnlyReplicas,
+      int outOfServiceReplicas, int expectedReplicas) {
     final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
-        decomissionedReplicas, expectedReplicas);
+        outOfServiceReplicas, expectedReplicas);
     if(priorityQueues.get(priLevel).add(block)) {
       if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
           expectedReplicas == 1) {
@@ -257,12 +254,10 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
 
   /** Remove a block from a low redundancy queue. */
   synchronized boolean remove(BlockInfo block,
-                              int oldReplicas,
-                              int oldReadOnlyReplicas,
-                              int decommissionedReplicas,
-                              int oldExpectedReplicas) {
+      int oldReplicas, int oldReadOnlyReplicas,
+      int outOfServiceReplicas, int oldExpectedReplicas) {
     final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
-        decommissionedReplicas, oldExpectedReplicas);
+        outOfServiceReplicas, oldExpectedReplicas);
     boolean removedBlock = remove(block, priLevel);
     if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
         oldExpectedReplicas == 1 &&
@@ -325,22 +320,22 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
    * method call.
    * @param block a low redundancy block
    * @param curReplicas current number of replicas of the block
-   * @param decommissionedReplicas  the number of decommissioned replicas
+   * @param outOfServiceReplicas  the number of out-of-service replicas
    * @param curExpectedReplicas expected number of replicas of the block
    * @param curReplicasDelta the change in the replicate count from before
    * @param expectedReplicasDelta the change in the expected replica count
    *        from before
    */
   synchronized void update(BlockInfo block, int curReplicas,
-                           int readOnlyReplicas, int decommissionedReplicas,
-                           int curExpectedReplicas,
-                           int curReplicasDelta, int expectedReplicasDelta) {
+      int readOnlyReplicas, int outOfServiceReplicas,
+      int curExpectedReplicas,
+      int curReplicasDelta, int expectedReplicasDelta) {
     int oldReplicas = curReplicas-curReplicasDelta;
     int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
     int curPri = getPriority(block, curReplicas, readOnlyReplicas,
-        decommissionedReplicas, curExpectedReplicas);
+        outOfServiceReplicas, curExpectedReplicas);
     int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
-        decommissionedReplicas, oldExpectedReplicas);
+        outOfServiceReplicas, oldExpectedReplicas);
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " +
         block +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
index 0198bcc..be984f9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
@@ -24,9 +24,11 @@ import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.Store
 import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.EXCESS;
 import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE;
+import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_FOR_READ;
+import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
+import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY;
 import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.REDUNDANT;
 import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.STALESTORAGE;
-import static 
org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY;
 
 /**
  * A immutable object that stores the number of live replicas and
@@ -41,6 +43,14 @@ public class NumberReplicas extends 
EnumCounters<NumberReplicas.StoredReplicaSta
     READONLY,
     DECOMMISSIONING,
     DECOMMISSIONED,
+    // We need live ENTERING_MAINTENANCE nodes to continue
+    // to serve read request while it is being transitioned to live
+    // IN_MAINTENANCE if these are the only replicas left.
+    // MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -
+    // Live ENTERING_MAINTENANCE.
+    MAINTENANCE_NOT_FOR_READ,
+    // Live ENTERING_MAINTENANCE nodes to serve read requests.
+    MAINTENANCE_FOR_READ,
     CORRUPT,
     // excess replicas already tracked by blockmanager's excess map
     EXCESS,
@@ -106,4 +116,20 @@ public class NumberReplicas extends 
EnumCounters<NumberReplicas.StoredReplicaSta
   public int redundantInternalBlocks() {
     return (int) get(REDUNDANT);
   }
-} 
+
+  public int maintenanceNotForReadReplicas() {
+    return (int) get(MAINTENANCE_NOT_FOR_READ);
+  }
+
+  public int maintenanceReplicas() {
+    return (int) (get(MAINTENANCE_NOT_FOR_READ) + get(MAINTENANCE_FOR_READ));
+  }
+
+  public int outOfServiceReplicas() {
+    return maintenanceReplicas() + decommissionedAndDecommissioning();
+  }
+
+  public int liveEnteringMaintenanceReplicas() {
+    return (int)get(MAINTENANCE_FOR_READ);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
index 45dcc8d..005e6d5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
@@ -81,7 +81,7 @@ public class StorageTypeStats {
       final DatanodeDescriptor node) {
     capacityUsed += info.getDfsUsed();
     blockPoolUsed += info.getBlockPoolUsed();
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       capacityTotal += info.getCapacity();
       capacityRemaining += info.getRemaining();
     } else {
@@ -90,7 +90,7 @@ public class StorageTypeStats {
   }
 
   void addNode(final DatanodeDescriptor node) {
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       nodesInService++;
     }
   }
@@ -99,7 +99,7 @@ public class StorageTypeStats {
       final DatanodeDescriptor node) {
     capacityUsed -= info.getDfsUsed();
     blockPoolUsed -= info.getBlockPoolUsed();
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       capacityTotal -= info.getCapacity();
       capacityRemaining -= info.getRemaining();
     } else {
@@ -108,7 +108,7 @@ public class StorageTypeStats {
   }
 
   void subtractNode(final DatanodeDescriptor node) {
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       nodesInService--;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 563682f..eb870f8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -5462,11 +5462,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
           .<String, Object> builder()
           .put("xferaddr", node.getXferAddr())
           .put("underReplicatedBlocks",
-              node.decommissioningStatus.getUnderReplicatedBlocks())
+          node.getLeavingServiceStatus().getUnderReplicatedBlocks())
+           // TODO use another property name for outOfServiceOnlyReplicas.
           .put("decommissionOnlyReplicas",
-              node.decommissioningStatus.getDecommissionOnlyReplicas())
+          node.getLeavingServiceStatus().getOutOfServiceOnlyReplicas())
           .put("underReplicateInOpenFiles",
-              node.decommissioningStatus.getUnderReplicatedInOpenFiles())
+          node.getLeavingServiceStatus().getUnderReplicatedInOpenFiles())
           .build();
       info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
     }
@@ -5528,7 +5529,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
     for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) {
       DatanodeDescriptor node = it.next();
-      if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      if (!node.isInService()) {
         it.remove();
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 84b51f6..483663e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -548,6 +548,13 @@
 </property>
 
 <property>
+  <name>dfs.namenode.maintenance.replication.min</name>
+  <value>1</value>
+  <description>Minimal live block replication in existence of maintenance mode.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.safemode.replication.min</name>
   <value></value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
index 0698628..534c5e0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
@@ -102,6 +102,7 @@ public class AdminStatesBaseTest {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
 
     hostsFileWriter.initialize(conf, "temp/admin");
+
   }
 
   @After
@@ -110,17 +111,22 @@ public class AdminStatesBaseTest {
     shutdownCluster();
   }
 
-  protected void writeFile(FileSystem fileSys, Path name, int repl)
+  static public FSDataOutputStream writeIncompleteFile(FileSystem fileSys,
+      Path name, short repl, short numOfBlocks) throws IOException {
+    return writeFile(fileSys, name, repl, numOfBlocks, false);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
       throws IOException {
     writeFile(fileSys, name, repl, 2);
   }
 
-  protected void writeFile(FileSystem fileSys, Path name, int repl,
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
       int numOfBlocks) throws IOException {
     writeFile(fileSys, name, repl, numOfBlocks, true);
   }
 
-  protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
       int repl, int numOfBlocks, boolean completeFile)
     throws IOException {
     // create and write a file that contains two blocks of data
@@ -136,6 +142,7 @@ public class AdminStatesBaseTest {
       stm.close();
       return null;
     } else {
+      stm.flush();
       // Do not close stream, return it
       // so that it is not garbage collected
       return stm;
@@ -353,7 +360,7 @@ public class AdminStatesBaseTest {
 
   protected void shutdownCluster() {
     if (cluster != null) {
-      cluster.shutdown();
+      cluster.shutdown(true);
     }
   }
 
@@ -362,12 +369,13 @@ public class AdminStatesBaseTest {
         refreshNodes(conf);
   }
 
-  protected DatanodeDescriptor getDatanodeDesriptor(
+  static private DatanodeDescriptor getDatanodeDesriptor(
       final FSNamesystem ns, final String datanodeUuid) {
     return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
   }
 
-  protected void cleanupFile(FileSystem fileSys, Path name) throws IOException 
{
+  static public void cleanupFile(FileSystem fileSys, Path name)
+      throws IOException {
     assertTrue(fileSys.exists(name));
     fileSys.delete(name, true);
     assertTrue(!fileSys.exists(name));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index ddb8237..6ca1e79 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -484,7 +484,7 @@ public class TestDecommission extends AdminStatesBaseTest {
       shutdownCluster();
     }
   }
-  
+
   /**
    * Tests cluster storage statistics during decommissioning for non
    * federated cluster


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to