HDFS-11182. Update DataNode to use DatasetVolumeChecker.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ec80de3c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ec80de3c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ec80de3c

Branch: refs/heads/branch-2
Commit: ec80de3ccc67e258a51b52866c5c0456d706b8eb
Parents: 413dcca
Author: Arpit Agarwal <a...@apache.org>
Authored: Mon Jan 16 14:40:37 2017 -0800
Committer: Arpit Agarwal <a...@apache.org>
Committed: Mon Jan 16 22:14:00 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/server/datanode/DataNode.java   | 412 +++++++++----------
 .../hdfs/server/datanode/StorageLocation.java   |   5 -
 .../datanode/checker/DatasetVolumeChecker.java  |  90 ++--
 .../server/datanode/fsdataset/FsDatasetSpi.java |  51 ++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 106 ++---
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   7 -
 .../datanode/fsdataset/impl/FsVolumeList.java   |  43 +-
 .../blockmanagement/TestBlockStatsMXBean.java   |   4 +
 .../server/datanode/SimulatedFSDataset.java     | 175 ++++----
 .../datanode/TestDataNodeHotSwapVolumes.java    |   3 +
 .../datanode/TestDataNodeVolumeFailure.java     |   3 +
 .../TestDataNodeVolumeFailureReporting.java     |   3 +
 .../TestDataNodeVolumeFailureToleration.java    |   3 +
 .../hdfs/server/datanode/TestDiskError.java     |  26 +-
 .../checker/TestDatasetVolumeChecker.java       |   9 +-
 .../TestDatasetVolumeCheckerFailures.java       |  43 +-
 .../extdataset/ExternalDatasetImpl.java         |   7 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |  86 +---
 .../fsdataset/impl/TestFsVolumeList.java        |  32 --
 19 files changed, 521 insertions(+), 587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 0031bff..bddde1f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -88,7 +88,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -161,8 +160,9 @@ import 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import 
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
 import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
+import 
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -249,7 +249,7 @@ import org.slf4j.LoggerFactory;
  * to a DataNode directly; a NameNode simply returns values from
  * functions invoked by a DataNode.
  *
- * DataNodes maintain an open server socket so that client code 
+ * DataNodes maintain an open server socket so that client code
  * or other DataNodes can read/write data.  The host/port for
  * this server is reported to the NameNode, which then sends that
  * information to clients or other DataNodes that might be interested.
@@ -260,7 +260,7 @@ public class DataNode extends ReconfigurableBase
     implements InterDatanodeProtocol, ClientDatanodeProtocol,
         TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {
   public static final Logger LOG = LoggerFactory.getLogger(DataNode.class);
-  
+
   static{
     HdfsConfiguration.init();
   }
@@ -275,10 +275,10 @@ public class DataNode extends ReconfigurableBase
         ", srvID: %s" +  // DatanodeRegistration
         ", blockid: %s" + // block id
         ", duration(ns): %s";  // duration time
-        
+
   static final Log ClientTraceLog =
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
-  
+
   private static final String USAGE =
       "Usage: hdfs datanode [-regular | -rollback | -rollingupgrade rollback" +
       " ]\n" +
@@ -309,7 +309,7 @@ public class DataNode extends ReconfigurableBase
   public static InetSocketAddress createSocketAddr(String target) {
     return NetUtils.createSocketAddr(target);
   }
-  
+
   volatile boolean shouldRun = true;
   volatile boolean shutdownForUpgrade = false;
   private boolean shutdownInProgress = false;
@@ -334,24 +334,24 @@ public class DataNode extends ReconfigurableBase
 
   DataNodeMetrics metrics;
   private InetSocketAddress streamingAddr;
-  
+
   // See the note below in incrDatanodeNetworkErrors re: concurrency.
   private LoadingCache<String, Map<String, Long>> datanodeNetworkCounts;
 
   private String hostName;
   private DatanodeID id;
-  
+
   final private String fileDescriptorPassingDisabledReason;
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private boolean hasAnyBlockPoolRegistered = false;
-  
+
   private  BlockScanner blockScanner;
   private DirectoryScanner directoryScanner = null;
-  
+
   /** Activated plug-ins. */
   private List<ServicePlugin> plugins;
-  
+
   // For InterDataNodeProtocol
   public RPC.Server ipcServer;
 
@@ -371,11 +371,7 @@ public class DataNode extends ReconfigurableBase
   SaslDataTransferServer saslServer;
   private final boolean getHdfsBlockLocationsEnabled;
   private ObjectName dataNodeInfoBeanName;
-  private Thread checkDiskErrorThread = null;
-  protected final int checkDiskErrorInterval;
-  private boolean checkDiskErrorFlag = false;
-  private Object checkDiskErrorMutex = new Object();
-  private long lastDiskErrorCheck;
+  private volatile long lastDiskErrorCheck;
   private String supergroup;
   private boolean isPermissionEnabled;
   private String dnUserName = null;
@@ -389,6 +385,8 @@ public class DataNode extends ReconfigurableBase
   @Nullable
   private final StorageLocationChecker storageLocationChecker;
 
+  private final DatasetVolumeChecker volumeChecker;
+
   private static Tracer createTracer(Configuration conf) {
     return new Tracer.Builder("DataNode").
         conf(TraceUtils.wrapHadoopConf(DATANODE_HTRACE_PREFIX , conf)).
@@ -404,7 +402,7 @@ public class DataNode extends ReconfigurableBase
    */
   @VisibleForTesting
   @InterfaceAudience.LimitedPrivate("HDFS")
-  DataNode(final Configuration conf) {
+  DataNode(final Configuration conf) throws DiskErrorException {
     super(conf);
     this.blockScanner = new BlockScanner(this, conf);
     this.tracer = createTracer(conf);
@@ -418,10 +416,9 @@ public class DataNode extends ReconfigurableBase
     this.connectToDnViaHostname = false;
     this.getHdfsBlockLocationsEnabled = false;
     this.pipelineSupportECN = false;
-    this.checkDiskErrorInterval =
-        ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
     initOOBTimeout();
     storageLocationChecker = null;
+    volumeChecker = new DatasetVolumeChecker(conf, new Timer());
   }
 
   /**
@@ -448,7 +445,7 @@ public class DataNode extends ReconfigurableBase
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.getHdfsBlockLocationsEnabled = conf.getBoolean(
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
     this.supergroup = 
conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
@@ -464,8 +461,7 @@ public class DataNode extends ReconfigurableBase
         ",hdfs-" +
         conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
 
-    this.checkDiskErrorInterval =
-        ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
+    this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
 
     // Determine whether we should try to pass file descriptors to clients.
     if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
@@ -861,7 +857,7 @@ public class DataNode extends ReconfigurableBase
   private synchronized void setClusterId(final String nsCid, final String bpid
       ) throws IOException {
     if(clusterId != null && !clusterId.equals(nsCid)) {
-      throw new IOException ("Cluster IDs not matched: dn cid=" + clusterId 
+      throw new IOException ("Cluster IDs not matched: dn cid=" + clusterId
           + " but ns cid="+ nsCid + "; bpid=" + bpid);
     }
     // else
@@ -941,11 +937,11 @@ public class DataNode extends ReconfigurableBase
   private void initIpcServer() throws IOException {
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
-    
-    // Add all the RPC protocols that the Datanode implements    
+
+    // Add all the RPC protocols that the Datanode implements
     RPC.setProtocolEngine(getConf(), ClientDatanodeProtocolPB.class,
         ProtobufRpcEngine.class);
-    ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator 
= 
+    ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator =
           new ClientDatanodeProtocolServerSideTranslatorPB(this);
     BlockingService service = ClientDatanodeProtocolService
         .newReflectiveBlockingService(clientDatanodeProtocolXlator);
@@ -966,7 +962,7 @@ public class DataNode extends ReconfigurableBase
     DFSUtil.addPBProtocol(getConf(), ReconfigurationProtocolPB.class, service,
         ipcServer);
 
-    InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
+    InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
         new InterDatanodeProtocolServerSideTranslatorPB(this);
     service = InterDatanodeProtocolService
         .newReflectiveBlockingService(interDatanodeProtocolXlator);
@@ -1032,12 +1028,12 @@ public class DataNode extends ReconfigurableBase
       return;
     }
     String reason = null;
-    if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 
+    if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
                     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
       reason = "verification is turned off by configuration";
     } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
       reason = "verifcation is not supported by SimulatedFSDataset";
-    } 
+    }
     if (reason == null) {
       directoryScanner = new DirectoryScanner(this, data, conf);
       directoryScanner.start();
@@ -1046,13 +1042,13 @@ public class DataNode extends ReconfigurableBase
                    reason);
     }
   }
-  
+
   private synchronized void shutdownDirectoryScanner() {
     if (directoryScanner != null) {
       directoryScanner.shutdown();
     }
   }
-  
+
   private void initDataXceiver() throws IOException {
     // find free port or use privileged port provided
     TcpPeerServer tcpPeerServer;
@@ -1127,7 +1123,7 @@ public class DataNode extends ReconfigurableBase
     }
     return domainPeerServer;
   }
-  
+
   // calls specific to BP
   public void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
       String storageUuid, boolean isOnTransientStorage) {
@@ -1140,7 +1136,7 @@ public class DataNode extends ReconfigurableBase
           + block.getBlockPoolId());
     }
   }
-  
+
   // calls specific to BP
   protected void notifyNamenodeReceivingBlock(
       ExtendedBlock block, String storageUuid) {
@@ -1152,7 +1148,7 @@ public class DataNode extends ReconfigurableBase
           + block.getBlockPoolId());
     }
   }
-  
+
   /** Notify the corresponding namenode to delete the block. */
   public void notifyNamenodeDeletedBlock(ExtendedBlock block, String 
storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
@@ -1163,7 +1159,7 @@ public class DataNode extends ReconfigurableBase
           + block.getBlockPoolId());
     }
   }
-  
+
   /**
    * Report a bad block which is hosted on the local DN.
    */
@@ -1202,7 +1198,7 @@ public class DataNode extends ReconfigurableBase
     BPOfferService bpos = getBPOSForBlock(block);
     bpos.reportRemoteBadBlock(srcDataNode, block);
   }
-  
+
   /**
    * Try to send an error report to the NNs associated with the given
    * block pool.
@@ -1258,10 +1254,10 @@ public class DataNode extends ReconfigurableBase
 
   /**
    * This method starts the data node with the specified conf.
-   * 
+   *
    * If conf's CONFIG_PROPERTY_SIMULATED property is set
    * then a simulated storage based data node is created.
-   * 
+   *
    * @param dataDirs - only for a non-simulated storage data node
    * @throws IOException
    */
@@ -1312,7 +1308,7 @@ public class DataNode extends ReconfigurableBase
     }
 
     storage = new DataStorage();
-    
+
     // global DN settings
     registerMXBean();
     initDataXceiver();
@@ -1320,7 +1316,7 @@ public class DataNode extends ReconfigurableBase
     pauseMonitor = new JvmPauseMonitor();
     pauseMonitor.init(getConf());
     pauseMonitor.start();
-  
+
     // BlockPoolTokenSecretManager is required to create ipc server.
     this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
 
@@ -1404,7 +1400,7 @@ public class DataNode extends ReconfigurableBase
       "protection and SSL for HTTP.  Using privileged resources in " +
       "combination with SASL RPC data transfer protection is not supported.");
   }
-  
+
   public static String generateUuid() {
     return UUID.randomUUID().toString();
   }
@@ -1439,10 +1435,10 @@ public class DataNode extends ReconfigurableBase
     }
 
     DatanodeID dnId = new DatanodeID(
-        streamingAddr.getAddress().getHostAddress(), hostName, 
+        streamingAddr.getAddress().getHostAddress(), hostName,
         storage.getDatanodeUuid(), getXferPort(), getInfoPort(),
             infoSecurePort, getIpcPort());
-    return new DatanodeRegistration(dnId, storageInfo, 
+    return new DatanodeRegistration(dnId, storageInfo,
         new ExportedBlockKeys(), VersionInfo.getVersion());
   }
 
@@ -1461,10 +1457,10 @@ public class DataNode extends ReconfigurableBase
           + bpRegistration.getDatanodeUuid()
           + ". Expecting " + storage.getDatanodeUuid());
     }
-    
+
     registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
   }
-  
+
   /**
    * After the block pool has contacted the NN, registers that block pool
    * with the secret manager, updating it with the secrets provided by the NN.
@@ -1484,7 +1480,7 @@ public class DataNode extends ReconfigurableBase
       }
     }
     if (!isBlockTokenEnabled) return;
-    
+
     if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
       long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
       long blockTokenLifetime = keys.getTokenLifetime();
@@ -1493,7 +1489,7 @@ public class DataNode extends ReconfigurableBase
           + blockKeyUpdateInterval / (60 * 1000)
           + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
           + " min(s)");
-      final BlockTokenSecretManager secretMgr = 
+      final BlockTokenSecretManager secretMgr =
           new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
               dnConf.encryptionAlgorithm);
       blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
@@ -1528,10 +1524,10 @@ public class DataNode extends ReconfigurableBase
    * One of the Block Pools has successfully connected to its NN.
    * This initializes the local storage for that block pool,
    * checks consistency of the NN's cluster ID, etc.
-   * 
+   *
    * If this is the first block pool to register, this also initializes
    * the datanode-scoped storage.
-   * 
+   *
    * @param bpos Block pool offer service
    * @throws IOException if the NN is inconsistent with the local storage.
    */
@@ -1541,12 +1537,12 @@ public class DataNode extends ReconfigurableBase
       throw new IOException("NamespaceInfo not found: Block pool " + bpos
           + " should have retrieved namespace info before initBlockPool.");
     }
-    
+
     setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
 
     // Register the new block pool with the BP manager.
     blockPoolManager.addBlockPool(bpos);
-    
+
     // In the case that this is the first block pool to connect, initialize
     // the dataset, block scanners, etc.
     initStorage(nsInfo);
@@ -1567,11 +1563,11 @@ public class DataNode extends ReconfigurableBase
   BPOfferService getBPOfferService(String bpid){
     return blockPoolManager.get(bpid);
   }
-  
+
   int getBpOsCount() {
     return blockPoolManager.getAllNamenodeThreads().size();
   }
-  
+
   /**
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
@@ -1579,7 +1575,7 @@ public class DataNode extends ReconfigurableBase
   private void initStorage(final NamespaceInfo nsInfo) throws IOException {
     final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
         = FsDatasetSpi.Factory.getFactory(getConf());
-    
+
     if (!factory.isSimulated()) {
       final StartupOption startOpt = getStartupOption(getConf());
       if (startOpt == null) {
@@ -1613,21 +1609,21 @@ public class DataNode extends ReconfigurableBase
     return 
NetUtils.createSocketAddr(conf.getTrimmed(DFS_DATANODE_HTTP_ADDRESS_KEY,
         DFS_DATANODE_HTTP_ADDRESS_DEFAULT));
   }
-  
+
   private void registerMXBean() {
     dataNodeInfoBeanName = MBeans.register("DataNode", "DataNodeInfo", this);
   }
-  
+
   @VisibleForTesting
   public DataXceiverServer getXferServer() {
-    return xserver;  
+    return xserver;
   }
-  
+
   @VisibleForTesting
   public int getXferPort() {
     return streamingAddr.getPort();
   }
-  
+
   /**
    * @return name useful for logging
    */
@@ -1640,7 +1636,7 @@ public class DataNode extends ReconfigurableBase
    * NB: The datanode can perform data transfer on the streaming
    * address however clients are given the IPC IP address for data
    * transfer, and that may be a different address.
-   * 
+   *
    * @return socket address for data transfer
    */
   public InetSocketAddress getXferAddress() {
@@ -1653,14 +1649,14 @@ public class DataNode extends ReconfigurableBase
   public int getIpcPort() {
     return ipcServer.getListenerAddress().getPort();
   }
-  
+
   /**
    * get BP registration by blockPool id
    * @return BP registration object
    * @throws IOException on error
    */
   @VisibleForTesting
-  public DatanodeRegistration getDNRegistrationForBP(String bpid) 
+  public DatanodeRegistration getDNRegistrationForBP(String bpid)
   throws IOException {
     DataNodeFaultInjector.get().noRegistration();
     BPOfferService bpos = blockPoolManager.get(bpid);
@@ -1669,13 +1665,13 @@ public class DataNode extends ReconfigurableBase
     }
     return bpos.bpRegistration;
   }
-  
+
   /**
    * Creates either NIO or regular depending on socketWriteTimeout.
    */
   protected Socket newSocket() throws IOException {
-    return (dnConf.socketWriteTimeout > 0) ? 
-           SocketChannel.open().socket() : new Socket();                       
            
+    return (dnConf.socketWriteTimeout > 0) ?
+           SocketChannel.open().socket() : new Socket();
   }
 
   /**
@@ -1721,24 +1717,24 @@ public class DataNode extends ReconfigurableBase
       throw new IOException(ie.getMessage());
     }
   }
-    
+
   public DataNodeMetrics getMetrics() {
     return metrics;
   }
-  
+
   /** Ensure the authentication method is kerberos */
   private void checkKerberosAuthMethod(String msg) throws IOException {
     // User invoking the call must be same as the datanode user
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != 
+    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() !=
         AuthenticationMethod.KERBEROS) {
       throw new AccessControlException("Error in " + msg
           + "Only kerberos based authentication is allowed.");
     }
   }
-  
+
   private void checkBlockLocalPathAccess() throws IOException {
     checkKerberosAuthMethod("getBlockLocalPathInfo()");
     String currentUser = 
UserGroupInformation.getCurrentUser().getShortUserName();
@@ -1797,7 +1793,7 @@ public class DataNode extends ReconfigurableBase
   }
 
   FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> token, int maxVersion) 
+      final Token<BlockTokenIdentifier> token, int maxVersion)
           throws ShortCircuitFdsUnsupportedException,
             ShortCircuitFdsVersionException, IOException {
     if (fileDescriptorPassingDisabledReason != null) {
@@ -1807,13 +1803,13 @@ public class DataNode extends ReconfigurableBase
     int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
     if (maxVersion < blkVersion) {
       throw new ShortCircuitFdsVersionException("Your client is too old " +
-        "to read this block!  Its format version is " + 
+        "to read this block!  Its format version is " +
         blkVersion + ", but the highest format version you can read is " +
         maxVersion);
     }
     metrics.incrBlocksGetLocalPathInfo();
     FileInputStream fis[] = new FileInputStream[2];
-    
+
     try {
       fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0);
       fis[1] = DatanodeUtil.getMetaDataInputStream(blk, data);
@@ -1828,7 +1824,7 @@ public class DataNode extends ReconfigurableBase
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(
       String bpId, long[] blockIds,
-      List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
+      List<Token<BlockTokenIdentifier>> tokens) throws IOException,
       UnsupportedOperationException {
     if (!getHdfsBlockLocationsEnabled) {
       throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
@@ -1847,7 +1843,7 @@ public class DataNode extends ReconfigurableBase
 
     return data.getHdfsBlocksMetadata(bpId, blockIds);
   }
-  
+
   private void checkBlockToken(ExtendedBlock block, 
Token<BlockTokenIdentifier> token,
       AccessMode accessMode) throws IOException {
     if (isBlockTokenEnabled) {
@@ -1880,11 +1876,11 @@ public class DataNode extends ReconfigurableBase
         }
       }
     }
-    
+
     List<BPOfferService> bposArray = (this.blockPoolManager == null)
         ? new ArrayList<BPOfferService>()
         : this.blockPoolManager.getAllNamenodeThreads();
-    // If shutdown is not for restart, set shouldRun to false early. 
+    // If shutdown is not for restart, set shouldRun to false early.
     if (!shutdownForUpgrade) {
       shouldRun = false;
     }
@@ -1903,11 +1899,6 @@ public class DataNode extends ReconfigurableBase
       }
     }
 
-    // Interrupt the checkDiskErrorThread and terminate it.
-    if(this.checkDiskErrorThread != null) {
-      this.checkDiskErrorThread.interrupt();
-    }
-    
     // Record the time of initial notification
     long timeNotified = Time.monotonicNow();
 
@@ -1928,6 +1919,8 @@ public class DataNode extends ReconfigurableBase
       }
     }
 
+    volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
+
     if (storageLocationChecker != null) {
       storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
     }
@@ -1939,7 +1932,7 @@ public class DataNode extends ReconfigurableBase
     // shouldRun is set to false here to prevent certain threads from exiting
     // before the restart prep is done.
     this.shouldRun = false;
-    
+
     // wait reconfiguration thread, if any, to exit
     shutdownReconfigurationTask();
 
@@ -1984,7 +1977,7 @@ public class DataNode extends ReconfigurableBase
       } catch (InterruptedException ie) {
       }
     }
-   
+
    // IPC server needs to be shutdown late in the process, otherwise
    // shutdown command response won't get sent.
    if (ipcServer != null) {
@@ -1998,7 +1991,7 @@ public class DataNode extends ReconfigurableBase
         LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
       }
     }
-    
+
     if (storage != null) {
       try {
         this.storage.unlockAll();
@@ -2026,46 +2019,53 @@ public class DataNode extends ReconfigurableBase
     }
     tracer.close();
   }
-  
-  
+
+
   /**
    * Check if there is a disk failure asynchronously and if so, handle the 
error
    */
   public void checkDiskErrorAsync() {
-    synchronized(checkDiskErrorMutex) {
-      checkDiskErrorFlag = true;
-      if(checkDiskErrorThread == null) {
-        startCheckDiskErrorThread();
-        checkDiskErrorThread.start();
-        LOG.info("Starting CheckDiskError Thread");
-      }
-    }
+    volumeChecker.checkAllVolumesAsync(
+        data, new DatasetVolumeChecker.Callback() {
+          @Override
+          public void call(Set<FsVolumeSpi> healthyVolumes,
+                           Set<FsVolumeSpi> failedVolumes) {
+            if (failedVolumes.size() > 0) {
+              LOG.warn("checkDiskErrorAsync callback got {} failed volumes: 
{}",
+                  failedVolumes.size(), failedVolumes);
+            } else {
+              LOG.debug("checkDiskErrorAsync: no volume failures detected");
+            }
+            lastDiskErrorCheck = Time.monotonicNow();
+            DataNode.this.handleVolumeFailures(failedVolumes);
+          }
+        });
   }
-  
+
   private void handleDiskError(String errMsgr) {
     final boolean hasEnoughResources = data.hasEnoughResource();
     LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
-    
-    // If we have enough active valid volumes then we do not want to 
+
+    // If we have enough active valid volumes then we do not want to
     // shutdown the DN completely.
-    int dpError = hasEnoughResources ? DatanodeProtocol.DISK_ERROR  
-                                     : DatanodeProtocol.FATAL_DISK_ERROR;  
+    int dpError = hasEnoughResources ? DatanodeProtocol.DISK_ERROR
+                                     : DatanodeProtocol.FATAL_DISK_ERROR;
     metrics.incrVolumeFailures();
 
     //inform NameNodes
     for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
       bpos.trySendErrorReport(dpError, errMsgr);
     }
-    
+
     if(hasEnoughResources) {
       scheduleAllBlockReport(0);
       return; // do not shutdown
     }
-    
+
     LOG.warn("DataNode is shutting down: " + errMsgr);
     shouldRun = false;
   }
-    
+
   /** Number of concurrent xceivers per node. */
   @Override // DataNodeMXBean
   public int getXceiverCount() {
@@ -2136,9 +2136,9 @@ public class DataNode extends ReconfigurableBase
       lengthTooShort = true;
     } catch (IOException e) {
       // The IOException indicates not being able to access block file,
-      // treat it the same here as blockFileNotExist, to trigger 
+      // treat it the same here as blockFileNotExist, to trigger
       // reporting it as a bad block
-      blockFileNotExist = true;      
+      blockFileNotExist = true;
     }
 
     if (replicaNotExist || replicaStateNotFinalized) {
@@ -2154,14 +2154,14 @@ public class DataNode extends ReconfigurableBase
       return;
     }
     if (lengthTooShort) {
-      // Check if NN recorded length matches on-disk length 
+      // Check if NN recorded length matches on-disk length
       // Shorter on-disk len indicates corruption so report NN the corrupt 
block
       reportBadBlock(bpos, block, "Can't replicate block " + block
-          + " because on-disk length " + data.getLength(block) 
+          + " because on-disk length " + data.getLength(block)
           + " is shorter than NameNode recorded length " + 
block.getNumBytes());
       return;
     }
-    
+
     int numTargets = xferTargets.length;
     if (numTargets > 0) {
       StringBuilder xfersBuilder = new StringBuilder();
@@ -2169,8 +2169,8 @@ public class DataNode extends ReconfigurableBase
         xfersBuilder.append(xferTargets[i]);
         xfersBuilder.append(" ");
       }
-      LOG.info(bpReg + " Starting thread to transfer " + 
-               block + " to " + xfersBuilder);                       
+      LOG.info(bpReg + " Starting thread to transfer " +
+               block + " to " + xfersBuilder);
 
       new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
@@ -2191,27 +2191,27 @@ public class DataNode extends ReconfigurableBase
 
   /* ********************************************************************
   Protocol when a client reads data from Datanode (Cur Ver: 9):
-  
+
   Client's Request :
   =================
-   
+
      Processed in DataXceiver:
      +----------------------------------------------+
      | Common Header   | 1 byte OP == OP_READ_BLOCK |
      +----------------------------------------------+
-     
+
      Processed in readBlock() :
      
+-------------------------------------------------------------------------+
      | 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length 
|
      
+-------------------------------------------------------------------------+
      |   vInt length   |  <DFSClient id> |
      +-----------------------------------+
-     
+
      Client sends optional response only at the end of receiving data.
-       
+
   DataNode Response :
   ===================
-   
+
     In readBlock() :
     If there is an error while initializing BlockSender :
        +---------------------------+
@@ -2221,21 +2221,21 @@ public class DataNode extends ReconfigurableBase
        +---------------------------+
        | 2 byte OP_STATUS_SUCCESS  |
        +---------------------------+
-       
+
     Actual data, sent by BlockSender.sendBlock() :
-    
+
       ChecksumHeader :
       +--------------------------------------------------+
       | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
       +--------------------------------------------------+
-      Followed by actual data in the form of PACKETS: 
+      Followed by actual data in the form of PACKETS:
       +------------------------------------+
       | Sequence of data PACKETs ....      |
       +------------------------------------+
-    
+
     A "PACKET" is defined further below.
-    
-    The client reads data until it receives a packet with 
+
+    The client reads data until it receives a packet with
     "LastPacketInBlock" set to true or with a zero length. It then replies
     to DataNode with one of the status codes:
     - CHECKSUM_OK:    All the chunk checksums have been verified
@@ -2245,16 +2245,16 @@ public class DataNode extends ReconfigurableBase
       +---------------+
       | 2 byte Status |
       +---------------+
-    
+
     The DataNode expects all well behaved clients to send the 2 byte
     status code. And if the the client doesn't, the DN will close the
     connection. So the status code is optional in the sense that it
     does not affect the correctness of the data. (And the client can
     always reconnect.)
-    
+
     PACKET : Contains a packet header, checksum and data. Amount of data
     ======== carried is set by BUFFER_SIZE.
-    
+
       +-----------------------------------------------------+
       | 4 byte packet length (excluding packet header)      |
       +-----------------------------------------------------+
@@ -2268,15 +2268,15 @@ public class DataNode extends ReconfigurableBase
       +-----------------------------------------------------+
       | actual data ......                                  |
       +-----------------------------------------------------+
-      
+
       x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
           CHECKSUM_SIZE
-          
+
       CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
-      
+
       The above packet format is used while writing data to DFS also.
       Not all the fields might be used while reading.
-    
+
    ************************************************************************ */
 
   /**
@@ -2293,7 +2293,7 @@ public class DataNode extends ReconfigurableBase
     final CachingStrategy cachingStrategy;
 
     /**
-     * Connect to the first item in the target list.  Pass along the 
+     * Connect to the first item in the target list.  Pass along the
      * entire target list, the block, and the data.
      */
     DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
@@ -2330,7 +2330,7 @@ public class DataNode extends ReconfigurableBase
       DataInputStream in = null;
       BlockSender blockSender = null;
       final boolean isClient = clientname.length() > 0;
-      
+
       try {
         final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
         InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
@@ -2346,11 +2346,11 @@ public class DataNode extends ReconfigurableBase
         //
         Token<BlockTokenIdentifier> accessToken = 
BlockTokenSecretManager.DUMMY_TOKEN;
         if (isBlockTokenEnabled) {
-          accessToken = blockPoolTokenSecretManager.generateToken(b, 
+          accessToken = blockPoolTokenSecretManager.generateToken(b,
               EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
         }
 
-        long writeTimeout = dnConf.socketWriteTimeout + 
+        long writeTimeout = dnConf.socketWriteTimeout +
                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
@@ -2360,11 +2360,11 @@ public class DataNode extends ReconfigurableBase
           unbufIn, keyFactory, accessToken, bpReg);
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
-        
+
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             DFSUtilClient.getSmallBufferSize(getConf())));
         in = new DataInputStream(unbufIn);
-        blockSender = new BlockSender(b, 0, b.getNumBytes(), 
+        blockSender = new BlockSender(b, 0, b.getNumBytes(),
             false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
             .build();
@@ -2486,28 +2486,28 @@ public class DataNode extends ReconfigurableBase
   }
 
   /** Instantiate a single datanode object. This must be run by invoking
-   *  {@link DataNode#runDatanodeDaemon()} subsequently. 
+   *  {@link DataNode#runDatanodeDaemon()} subsequently.
    */
   public static DataNode instantiateDataNode(String args[],
                                       Configuration conf) throws IOException {
     return instantiateDataNode(args, conf, null);
   }
-  
-  /** Instantiate a single datanode object, along with its secure resources. 
-   * This must be run by invoking{@link DataNode#runDatanodeDaemon()} 
-   * subsequently. 
+
+  /** Instantiate a single datanode object, along with its secure resources.
+   * This must be run by invoking{@link DataNode#runDatanodeDaemon()}
+   * subsequently.
    */
   public static DataNode instantiateDataNode(String args [], Configuration 
conf,
       SecureResources resources) throws IOException {
     if (conf == null)
       conf = new HdfsConfiguration();
-    
+
     if (args != null) {
       // parse generic hadoop options
       GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
       args = hParser.getRemainingArgs();
     }
-    
+
     if (!parseArguments(args, conf)) {
       printUsage(System.err);
       return null;
@@ -2555,7 +2555,7 @@ public class DataNode extends ReconfigurableBase
                                  Configuration conf) throws IOException {
     return createDataNode(args, conf, null);
   }
-  
+
   /** Instantiate & Start a single datanode daemon and wait for it to finish.
    *  If this thread is specifically interrupted, it will stop waiting.
    */
@@ -2667,7 +2667,7 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
-   * This methods  arranges for the data node to send 
+   * This methods  arranges for the data node to send
    * the block report at the next heartbeat.
    */
   public void scheduleAllBlockReport(long delay) {
@@ -2679,7 +2679,7 @@ public class DataNode extends ReconfigurableBase
   /**
    * Examples are adding and deleting blocks directly.
    * The most common usage will be when the data node's storage is simulated.
-   * 
+   *
    * @return the fsdataset that stores the blocks
    */
   @VisibleForTesting
@@ -2725,7 +2725,7 @@ public class DataNode extends ReconfigurableBase
       terminate(errorCode);
     }
   }
-  
+
   public static void main(String args[]) {
     if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
       System.exit(0);
@@ -2742,7 +2742,7 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
-   * Update replica with the new generation stamp and length.  
+   * Update replica with the new generation stamp and length.
    */
   @Override // InterDatanodeProtocol
   public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
@@ -2802,7 +2802,7 @@ public class DataNode extends ReconfigurableBase
    * @param b the block to transfer.
    *          The corresponding replica must be an RBW or a Finalized.
    *          Its GS and numBytes will be set to
-   *          the stored GS and the visible length. 
+   *          the stored GS and the visible length.
    * @param targets targets to transfer the block to
    * @param client client name
    */
@@ -2857,12 +2857,12 @@ public class DataNode extends ReconfigurableBase
     return NetUtils.createSocketAddr(
         conf.getTrimmed(DFS_DATANODE_ADDRESS_KEY, 
DFS_DATANODE_ADDRESS_DEFAULT));
   }
-  
+
   @Override // DataNodeMXBean
   public String getVersion() {
     return VersionInfo.getVersion();
   }
-  
+
   @Override // DataNodeMXBean
   public String getRpcPort(){
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
@@ -2881,7 +2881,7 @@ public class DataNode extends ReconfigurableBase
   public String getHttpPort(){
     return this.getConf().get("dfs.datanode.info.port");
   }
-  
+
   /**
    * @return the datanode's http port
    */
@@ -2897,7 +2897,7 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
-   * Returned information is a JSON representation of a map with 
+   * Returned information is a JSON representation of a map with
    * name node host name as the key and block pool Id as the value.
    * Note that, if there are multiple NNs in an NA nameservice,
    * a given block pool may be represented twice.
@@ -2936,8 +2936,8 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
-   * Returned information is a JSON representation of a map with 
-   * volume name as the key and value is a map of volume attribute 
+   * Returned information is a JSON representation of a map with
+   * volume name as the key and value is a map of volume attribute
    * keys to its values
    */
   @Override // DataNodeMXBean
@@ -2945,13 +2945,12 @@ public class DataNode extends ReconfigurableBase
     Preconditions.checkNotNull(data, "Storage not yet initialized");
     return JSON.toString(data.getVolumeInfoMap());
   }
-  
+
   @Override // DataNodeMXBean
   public synchronized String getClusterId() {
     return clusterId;
   }
 
-
   @Override // DataNodeMXBean
   public String getFileIoProviderStatistics() {
     return fileIoProvider.getStatistics();
@@ -2967,7 +2966,7 @@ public class DataNode extends ReconfigurableBase
     setConf(new Configuration());
     refreshNamenodes(getConf());
   }
-  
+
   @Override // ClientDatanodeProtocol
   public void deleteBlockPool(String blockPoolId, boolean force)
       throws IOException {
@@ -2981,7 +2980,7 @@ public class DataNode extends ReconfigurableBase
           "The block pool is still running. First do a refreshNamenodes to " +
           "shutdown the block pool service");
     }
-   
+
     data.deleteBlockPool(blockPoolId, force);
   }
 
@@ -3077,7 +3076,7 @@ public class DataNode extends ReconfigurableBase
     }
     return false;
   }
-  
+
   /**
    * @param bpid block pool Id
    * @return true - if BPOfferService thread is alive
@@ -3094,7 +3093,7 @@ public class DataNode extends ReconfigurableBase
   /**
    * A datanode is considered to be fully started if all the BP threads are
    * alive and all the block pools are initialized.
-   * 
+   *
    * @return true - if the data node is fully started
    */
   public boolean isDatanodeFullyStarted() {
@@ -3105,12 +3104,12 @@ public class DataNode extends ReconfigurableBase
     }
     return true;
   }
-  
+
   @VisibleForTesting
   public DatanodeID getDatanodeId() {
     return id;
   }
-  
+
   @VisibleForTesting
   public void clearAllBlockSecretKeys() {
     blockPoolTokenSecretManager.clearAllKeysForTesting();
@@ -3122,7 +3121,7 @@ public class DataNode extends ReconfigurableBase
                        (DataXceiverServer) 
this.dataXceiverServer.getRunnable();
     return dxcs.balanceThrottler.getBandwidth();
   }
-  
+
   public DNConf getDnConf() {
     return dnConf;
   }
@@ -3143,70 +3142,55 @@ public class DataNode extends ReconfigurableBase
   public ShortCircuitRegistry getShortCircuitRegistry() {
     return shortCircuitRegistry;
   }
-  
+
   /**
-   * Check the disk error
+   * Check the disk error synchronously.
    */
-  private void checkDiskError() {
-    Set<File> unhealthyDataDirs = data.checkDataDir();
-    if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
+  @VisibleForTesting
+  public void checkDiskError() throws IOException {
+    Set<FsVolumeSpi> unhealthyVolumes;
+    try {
+      unhealthyVolumes = volumeChecker.checkAllVolumes(data);
+      lastDiskErrorCheck = Time.monotonicNow();
+    } catch (InterruptedException e) {
+      LOG.error("Interruped while running disk check", e);
+      throw new IOException("Interrupted while running disk check", e);
+    }
+
+    if (unhealthyVolumes.size() > 0) {
+      LOG.warn("checkDiskError got {} failed volumes - {}",
+          unhealthyVolumes.size(), unhealthyVolumes);
+      handleVolumeFailures(unhealthyVolumes);
+    } else {
+      LOG.debug("checkDiskError encountered no failures");
+    }
+  }
+
+  private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
+    data.handleVolumeFailures(unhealthyVolumes);
+    final Set<File> unhealthyDirs = new HashSet<>(unhealthyVolumes.size());
+
+    if (!unhealthyVolumes.isEmpty()) {
+      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
+      for (FsVolumeSpi vol : unhealthyVolumes) {
+        unhealthyDirs.add(new File(vol.getBasePath()).getAbsoluteFile());
+        sb.append(vol).append(";");
+      }
+
       try {
         // Remove all unhealthy volumes from DataNode.
-        removeVolumes(unhealthyDataDirs, false);
+        removeVolumes(unhealthyDirs, false);
       } catch (IOException e) {
         LOG.warn("Error occurred when removing unhealthy storage dirs: "
             + e.getMessage(), e);
       }
-      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
-      for (File dataDir : unhealthyDataDirs) {
-        sb.append(dataDir.getAbsolutePath() + ";");
-      }
+      LOG.info(sb.toString());
       handleDiskError(sb.toString());
     }
   }
 
-  /**
-   * Starts a new thread which will check for disk error check request 
-   * every 5 sec
-   */
-  private void startCheckDiskErrorThread() {
-    checkDiskErrorThread = new Thread(new Runnable() {
-          @Override
-          public void run() {
-            while(shouldRun) {
-              boolean tempFlag ;
-              synchronized(checkDiskErrorMutex) {
-                tempFlag = checkDiskErrorFlag;
-                checkDiskErrorFlag = false;
-              }
-              if(tempFlag) {
-                try {
-                  checkDiskError();
-                } catch (Exception e) {
-                  LOG.warn("Unexpected exception occurred while checking disk 
error  " + e);
-                  checkDiskErrorThread = null;
-                  return;
-                }
-                synchronized(checkDiskErrorMutex) {
-                  lastDiskErrorCheck = Time.monotonicNow();
-                }
-              }
-              try {
-                Thread.sleep(checkDiskErrorInterval);
-              } catch (InterruptedException e) {
-                LOG.debug("InterruptedException in check disk error thread", 
e);
-                checkDiskErrorThread = null;
-                return;
-              }
-            }
-          }
-    });
-  }
-  
   public long getLastDiskErrorCheck() {
-    synchronized(checkDiskErrorMutex) {
-      return lastDiskErrorCheck;
-    }
+    return lastDiskErrorCheck;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
index 7a91a82..3c7ba29 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
@@ -30,11 +30,6 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.util.DiskChecker;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index e49ec7d..8f346dc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -164,9 +163,21 @@ public class DatasetVolumeChecker {
   public Set<FsVolumeSpi> checkAllVolumes(
       final FsDatasetSpi<? extends FsVolumeSpi> dataset)
       throws InterruptedException {
-
-    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+    final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+    if (gap < minDiskCheckGapMs) {
       numSkippedChecks.incrementAndGet();
+      LOG.trace(
+          "Skipped checking all volumes, time since last check {} is less " +
+          "than the minimum gap between checks ({} ms).",
+          gap, minDiskCheckGapMs);
+      return Collections.emptySet();
+    }
+
+    final FsDatasetSpi.FsVolumeReferences references =
+        dataset.getFsVolumeReferences();
+
+    if (references.size() == 0) {
+      LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
       return Collections.emptySet();
     }
 
@@ -175,9 +186,8 @@ public class DatasetVolumeChecker {
     final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
     final Set<FsVolumeSpi> allVolumes = new HashSet<>();
 
-    final FsDatasetSpi.FsVolumeReferences references =
-        dataset.getFsVolumeReferences();
-    final CountDownLatch resultsLatch = new CountDownLatch(references.size());
+    final AtomicLong numVolumes = new AtomicLong(references.size());
+    final CountDownLatch latch = new CountDownLatch(1);
 
     for (int i = 0; i < references.size(); ++i) {
       final FsVolumeReference reference = references.getReference(i);
@@ -186,12 +196,18 @@ public class DatasetVolumeChecker {
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       LOG.info("Scheduled health check for volume {}", reference.getVolume());
       Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, resultsLatch, null));
+          reference, healthyVolumes, failedVolumes, numVolumes, new Callback() 
{
+        @Override
+        public void call(Set<FsVolumeSpi> ignored1,
+                         Set<FsVolumeSpi> ignored2) {
+          latch.countDown();
+        }
+      }));
     }
 
     // Wait until our timeout elapses, after which we give up on
     // the remaining volumes.
-    if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+    if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
       LOG.warn("checkAllVolumes timed out after {} ms" +
           maxAllowedTimeForCheckMs);
     }
@@ -225,18 +241,28 @@ public class DatasetVolumeChecker {
   public boolean checkAllVolumesAsync(
       final FsDatasetSpi<? extends FsVolumeSpi> dataset,
       Callback callback) {
-
-    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+    final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+    if (gap < minDiskCheckGapMs) {
       numSkippedChecks.incrementAndGet();
+      LOG.trace(
+          "Skipped checking all volumes, time since last check {} is less " +
+              "than the minimum gap between checks ({} ms).",
+          gap, minDiskCheckGapMs);
+      return false;
+    }
+
+    final FsDatasetSpi.FsVolumeReferences references =
+        dataset.getFsVolumeReferences();
+
+    if (references.size() == 0) {
+      LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
       return false;
     }
 
     lastAllVolumesCheck = timer.monotonicNow();
     final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
     final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
-    final FsDatasetSpi.FsVolumeReferences references =
-        dataset.getFsVolumeReferences();
-    final CountDownLatch latch = new CountDownLatch(references.size());
+    final AtomicLong numVolumes = new AtomicLong(references.size());
 
     LOG.info("Checking {} volumes", references.size());
     for (int i = 0; i < references.size(); ++i) {
@@ -245,7 +271,7 @@ public class DatasetVolumeChecker {
       ListenableFuture<VolumeCheckResult> future =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, latch, callback));
+          reference, healthyVolumes, failedVolumes, numVolumes, callback));
     }
     numAsyncDatasetChecks.incrementAndGet();
     return true;
@@ -273,8 +299,10 @@ public class DatasetVolumeChecker {
    *
    * @param volume the volume that is to be checked.
    * @param callback callback to be invoked when the volume check completes.
+   * @return true if the check was scheduled and the callback will be invoked.
+   *         false otherwise.
    */
-  public void checkVolume(
+  public boolean checkVolume(
       final FsVolumeSpi volume,
       Callback callback) {
     FsVolumeReference volumeReference;
@@ -283,14 +311,15 @@ public class DatasetVolumeChecker {
     } catch (ClosedChannelException e) {
       // The volume has already been closed.
       callback.call(new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>());
-      return;
+      return false;
     }
     ListenableFuture<VolumeCheckResult> future =
         delegateChecker.schedule(volume, IGNORED_CONTEXT);
     numVolumeChecks.incrementAndGet();
     Futures.addCallback(future, new ResultHandler(
-        volumeReference, new HashSet<FsVolumeSpi>(),
-        new HashSet<FsVolumeSpi>(), new CountDownLatch(1), callback));
+        volumeReference, new HashSet<FsVolumeSpi>(), new 
HashSet<FsVolumeSpi>(),
+        new AtomicLong(1), callback));
+    return true;
   }
 
   /**
@@ -301,24 +330,33 @@ public class DatasetVolumeChecker {
     private final FsVolumeReference reference;
     private final Set<FsVolumeSpi> failedVolumes;
     private final Set<FsVolumeSpi> healthyVolumes;
-    private final CountDownLatch latch;
-    private final AtomicLong numVolumes;
+    private final AtomicLong volumeCounter;
 
     @Nullable
     private final Callback callback;
 
+    /**
+     *
+     * @param reference FsVolumeReference to be released when the check is
+     *                  complete.
+     * @param healthyVolumes set of healthy volumes. If the disk check is
+     *                       successful, add the volume here.
+     * @param failedVolumes set of failed volumes. If the disk check fails,
+     *                      add the volume here.
+     * @param semaphore semaphore used to trigger callback invocation.
+     * @param callback invoked when the semaphore can be successfully acquired.
+     */
     ResultHandler(FsVolumeReference reference,
                   Set<FsVolumeSpi> healthyVolumes,
                   Set<FsVolumeSpi> failedVolumes,
-                  CountDownLatch latch,
+                  AtomicLong volumeCounter,
                   @Nullable Callback callback) {
       Preconditions.checkState(reference != null);
       this.reference = reference;
       this.healthyVolumes = healthyVolumes;
       this.failedVolumes = failedVolumes;
-      this.latch = latch;
+      this.volumeCounter = volumeCounter;
       this.callback = callback;
-      numVolumes = new AtomicLong(latch.getCount());
     }
 
     @Override
@@ -372,10 +410,8 @@ public class DatasetVolumeChecker {
 
     private void invokeCallback() {
       try {
-        latch.countDown();
-
-        if (numVolumes.decrementAndGet() == 0 &&
-            callback != null) {
+        final long remaining = volumeCounter.decrementAndGet();
+        if (callback != null && remaining == 0) {
           callback.call(healthyVolumes, failedVolumes);
         }
       } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 8356331..4ce603f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -65,7 +65,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 /**
  * This is a service provider interface for the underlying storage that
  * stores replicas for a data node.
- * The default implementation stores replicas on local drives. 
+ * The default implementation stores replicas on local drives.
  */
 @InterfaceAudience.Private
 public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
@@ -273,7 +273,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   long getLength(ExtendedBlock b) throws IOException;
 
   /**
-   * Get reference to the replica meta info in the replicasMap. 
+   * Get reference to the replica meta info in the replicasMap.
    * To be called from methods that are synchronized on {@link FSDataset}
    * @return replica from the replicas map
    */
@@ -314,7 +314,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   /**
    * Creates a temporary replica and returns the meta information of the 
replica
    * .
-   * 
+   *
    * @param b block
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
@@ -324,7 +324,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
 
   /**
    * Creates a RBW replica and returns the meta info of the replica
-   * 
+   *
    * @param b block
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
@@ -334,7 +334,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
 
   /**
    * Recovers a RBW replica and returns the meta info of the replica.
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param minBytesRcvd the minimum number of bytes that the replica could 
have
@@ -355,7 +355,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
 
   /**
    * Append to a finalized replica and returns the meta info of the replica.
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param expectedBlockLen the number of bytes the replica is expected to 
have
@@ -368,7 +368,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   /**
    * Recover a failed append to a finalized replica and returns the meta
    * info of the replica.
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param expectedBlockLen the number of bytes the replica is expected to 
have
@@ -377,11 +377,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    */
   ReplicaHandler recoverAppend(
       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
-  
+
   /**
    * Recover a failed pipeline close.
    * It bumps the replica's generation stamp and finalize it if RBW replica
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param expectedBlockLen the number of bytes the replica is expected to 
have
@@ -390,7 +390,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    */
   Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
       ) throws IOException;
-  
+
   /**
    * Finalizes the block previously opened for writing using writeToBlock.
    * The block size is what is in the parameter b and it must match the amount
@@ -437,19 +437,19 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    *
    * @throws ReplicaNotFoundException          If the replica is not found
    *
-   * @throws UnexpectedReplicaStateException   If the replica is not in the 
+   * @throws UnexpectedReplicaStateException   If the replica is not in the
    *                                             expected state.
-   * @throws FileNotFoundException             If the block file is not found 
or there 
+   * @throws FileNotFoundException             If the block file is not found 
or there
    *                                              was an error locating it.
    * @throws EOFException                      If the replica length is too 
short.
-   * 
-   * @throws IOException                       May be thrown from the methods 
called. 
+   *
+   * @throws IOException                       May be thrown from the methods 
called.
    */
   void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
       throws ReplicaNotFoundException, UnexpectedReplicaStateException,
       FileNotFoundException, EOFException, IOException;
-      
-  
+
+
   /**
    * Is the block valid?
    * @return - true if the specified block is valid
@@ -493,10 +493,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   boolean isCached(String bpid, long blockId);
 
     /**
-     * Check if all the data directories are healthy
-     * @return A set of unhealthy data directories.
+     * Handle volume failures by removing the failed volumes.
      */
-  Set<File> checkDataDir();
+  void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes);
 
   /**
    * Shutdown the FSDataset
@@ -516,7 +515,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
 
   /**
    * Checks how many valid storage volumes there are in the DataNode.
-   * @return true if more than the minimum number of valid volumes are left 
+   * @return true if more than the minimum number of valid volumes are left
    * in the FSDataSet.
    */
   boolean hasEnoughResource();
@@ -528,7 +527,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
 
   /**
    * Initialize a replica recovery.
-   * @return actual state of the replica on this data-node or 
+   * @return actual state of the replica on this data-node or
    * null if data-node does not have the replica.
    */
   ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock
@@ -555,13 +554,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   void shutdownBlockPool(String bpid) ;
 
   /**
-   * Deletes the block pool directories. If force is false, directories are 
-   * deleted only if no block files exist for the block pool. If force 
+   * Deletes the block pool directories. If force is false, directories are
+   * deleted only if no block files exist for the block pool. If force
    * is true entire directory for the blockpool is deleted along with its
    * contents.
    * @param bpid BlockPool Id to be deleted.
    * @param force If force is false, directories are deleted only if no
-   *        block files exist for the block pool, otherwise entire 
+   *        block files exist for the block pool, otherwise entire
    *        directory for the blockpool is deleted along with its contents.
    * @throws IOException
    */
@@ -574,9 +573,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
       ) throws IOException;
 
   /**
-   * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in 
+   * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in
    * <code>blocks</code>.
-   * 
+   *
    * @param bpid pool to query
    * @param blockIds List of block ids for which to return metadata
    * @return metadata Metadata for the list of blocks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index c821989..e37934e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -231,7 +231,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
     return null;
   }
-  
+
   @Override // FsDatasetSpi
   public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
       throws IOException {
@@ -291,7 +291,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @VisibleForTesting
   final AutoCloseableLock datasetLock;
   private final Condition datasetLockCondition;
-  
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -324,8 +324,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     if (volsFailed > volFailuresTolerated) {
       throw new DiskErrorException("Too many failed volumes - "
-          + "current valid volumes: " + storage.getNumStorageDirs() 
-          + ", volumes configured: " + volsConfigured 
+          + "current valid volumes: " + storage.getNumStorageDirs()
+          + ", volumes configured: " + volsConfigured
           + ", volumes failed: " + volsFailed
           + ", volume failures tolerated: " + volFailuresTolerated);
     }
@@ -614,9 +614,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       return volumes.getBlockPoolUsed(bpid);
     }
   }
-  
+
   /**
-   * Return true - if there are still valid volumes on the DataNode. 
+   * Return true - if there are still valid volumes on the DataNode.
    */
   @Override // FsDatasetSpi
   public boolean hasEnoughResource() {
@@ -762,7 +762,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private File getBlockFile(ExtendedBlock b) throws IOException {
     return getBlockFile(b.getBlockPoolId(), b.getBlockId());
   }
-  
+
   /**
    * Get File name for a given block.
    */
@@ -824,7 +824,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * block pool Id, block Id and generation stamp must match.
    * @param b extended block
    * @return the meta replica information
-   * @throws ReplicaNotFoundException if no entry is in the map or 
+   * @throws ReplicaNotFoundException if no entry is in the map or
    *                        there is a generation stamp mismatch
    */
   ReplicaInfo getReplicaInfo(ExtendedBlock b)
@@ -849,7 +849,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param bpid block pool Id
    * @param blkid block Id
    * @return the meta replica information; null if block was not found
-   * @throws ReplicaNotFoundException if no entry is in the map or 
+   * @throws ReplicaNotFoundException if no entry is in the map or
    *                        there is a generation stamp mismatch
    */
   private ReplicaInfo getReplicaInfo(String bpid, long blkid)
@@ -1190,15 +1190,15 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   }
 
   /** Append to a finalized replica
-   * Change a finalized replica to be a RBW replica and 
+   * Change a finalized replica to be a RBW replica and
    * bump its generation stamp to be the newGS
-   * 
+   *
    * @param bpid block pool Id
    * @param replicaInfo a finalized replica
    * @param newGS new generation stamp
    * @param estimateBlockLen estimate generation stamp
    * @return a RBW replica
-   * @throws IOException if moving the replica from finalized directory 
+   * @throws IOException if moving the replica from finalized directory
    *         to rbw directory fails
    */
   private ReplicaBeingWritten append(String bpid,
@@ -1287,10 +1287,10 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
+  private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException, MustStopExistingWriter {
     ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), 
b.getBlockId());
-    
+
     // check state
     if (replicaInfo.getState() != ReplicaState.FINALIZED &&
         replicaInfo.getState() != ReplicaState.RBW) {
@@ -1304,10 +1304,10 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
         replicaGenerationStamp > newGS) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + 
replicaGenerationStamp
-          + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          + ". Expected GS range is [" + b.getGenerationStamp() + ", " +
           newGS + "].");
     }
-    
+
     // stop the previous writer before check a replica's length
     long replicaLen = replicaInfo.getNumBytes();
     if (replicaInfo.getState() == ReplicaState.RBW) {
@@ -1316,22 +1316,22 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
         throw new MustStopExistingWriter(rbw);
       }
       // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the 
same
-      if (replicaLen != rbw.getBytesOnDisk() 
+      if (replicaLen != rbw.getBytesOnDisk()
           || replicaLen != rbw.getBytesAcked()) {
-        throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 
-            "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 
+        throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo +
+            "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" +
             rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
             ") are not the same.");
       }
     }
-    
+
     // check block length
     if (replicaLen != expectedBlockLen) {
-      throw new IOException("Corrupted replica " + replicaInfo + 
-          " with a length of " + replicaLen + 
+      throw new IOException("Corrupted replica " + replicaInfo +
+          " with a length of " + replicaLen +
           " expected length is " + expectedBlockLen);
     }
-    
+
     return replicaInfo;
   }
 
@@ -1390,7 +1390,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       }
     }
   }
-  
+
   /**
    * Bump a replica's generation stamp to a new one.
    * Its on-disk meta file name is renamed to be the new one too.
@@ -1559,7 +1559,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       return new ReplicaHandler(rbw, ref);
     }
   }
-  
+
   @Override // FsDatasetSpi
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
@@ -1673,7 +1673,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       // Hang too long, just bail out. This is not supposed to happen.
       long writerStopMs = Time.monotonicNow() - startTimeMs;
       if (writerStopMs > writerStopTimeoutMs) {
-        LOG.warn("Unable to stop existing writer for block " + b + " after " 
+        LOG.warn("Unable to stop existing writer for block " + b + " after "
             + writerStopMs + " miniseconds.");
         throw new IOException("Unable to stop existing writer for block " + b
             + " after " + writerStopMs + " miniseconds.");
@@ -1690,7 +1690,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
    * last checksum will be overwritten.
    */
   @Override // FsDatasetSpi
-  public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams 
streams, 
+  public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams 
streams,
       int checksumSize) throws IOException {
     FileOutputStream file = (FileOutputStream)streams.getChecksumOut();
     FileChannel channel = file.getChannel();
@@ -1807,7 +1807,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       LOG.warn("No file exists for block: " + b);
       return true;
     }
-    
+
     if (!blockFile.delete()) {
       LOG.warn("Not able to delete the block file: " + blockFile);
       return false;
@@ -1910,21 +1910,21 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
    * @param state       If this is null, it is ignored.  If it is non-null, we
    *                        will check that the replica has this state.
    *
-   * @throws ReplicaNotFoundException          If the replica is not found 
+   * @throws ReplicaNotFoundException          If the replica is not found
    *
-   * @throws UnexpectedReplicaStateException   If the replica is not in the 
+   * @throws UnexpectedReplicaStateException   If the replica is not in the
    *                                             expected state.
    * @throws FileNotFoundException             If the block file is not found 
or there
    *                                              was an error locating it.
    * @throws EOFException                      If the replica length is too 
short.
-   * 
-   * @throws IOException                       May be thrown from the methods 
called. 
+   *
+   * @throws IOException                       May be thrown from the methods 
called.
    */
   @Override // FsDatasetSpi
   public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
       throws ReplicaNotFoundException, UnexpectedReplicaStateException,
       FileNotFoundException, EOFException, IOException {
-    final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+    final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
         b.getLocalBlock());
     if (replicaInfo == null) {
       throw new ReplicaNotFoundException(b);
@@ -1950,7 +1950,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   public boolean isValidBlock(ExtendedBlock b) {
     return isValid(b, ReplicaState.FINALIZED);
   }
-  
+
   /**
    * Check whether the given block is a valid RBW.
    */
@@ -2182,7 +2182,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       genstamp = info.getGenerationStamp();
       volumeExecutor = volume.getCacheExecutor();
     }
-    cacheManager.cacheBlock(blockId, bpid, 
+    cacheManager.cacheBlock(blockId, bpid,
         blockFileName, length, genstamp, volumeExecutor);
   }
 
@@ -2237,12 +2237,13 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
    * if some volumes failed - the caller must emove all the blocks that belong
    * to these failed volumes.
    * @return the failed volumes. Returns null if no volume failed.
+   * @param failedVolumes
    */
   @Override // FsDatasetSpi
-  public Set<File> checkDataDir() {
-   return volumes.checkDirs();
+  public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
+    volumes.handleVolumeFailures(failedVolumes);
   }
-    
+
 
   @Override // FsDatasetSpi
   public String toString() {
@@ -2250,14 +2251,14 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   }
 
   private ObjectName mbeanName;
-  
+
   /**
    * Register the FSDataset MBean using the name
    *        "hadoop:service=DataNode,name=FSDatasetState-<datanodeUuid>"
    */
   void registerMBean(final String datanodeUuid) {
     // We wrap to bypass standard mbean naming convetion.
-    // This wraping can be removed in java 6 as it is more flexible in 
+    // This wraping can be removed in java 6 as it is more flexible in
     // package naming for mbeans and their impl.
     try {
       StandardMBean bean = new StandardMBean(this,FSDatasetMBean.class);
@@ -2280,7 +2281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     if (mbeanName != null) {
       MBeans.unregister(mbeanName);
     }
-    
+
     if (asyncDiskService != null) {
       asyncDiskService.shutdown();
     }
@@ -2288,7 +2289,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     if (asyncLazyPersistService != null) {
       asyncLazyPersistService.shutdown();
     }
-    
+
     if(volumes != null) {
       volumes.shutdown();
     }
@@ -2497,7 +2498,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     return volumeMap.get(bpid, blockId);
   }
 
-  @Override 
+  @Override
   public String getReplicaString(String bpid, long blockId) {
     try(AutoCloseableLock lock = datasetLock.acquire()) {
       final Replica r = volumeMap.get(bpid, blockId);
@@ -2753,7 +2754,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       return replica.getVisibleLength();
     }
   }
-  
+
   @Override
   public void addBlockPool(String bpid, Configuration conf)
       throws IOException {
@@ -2775,7 +2776,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       volumes.removeBlockPool(bpid, blocksPerVolume);
     }
   }
-  
+
   /**
    * Class for representing the Datanode volume information
    */
@@ -2796,7 +2797,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       this.reservedSpaceForReplicas = v.getReservedForReplicas();
       this.numBlocks = v.getNumBlocks();
     }
-  }  
+  }
 
   private Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
@@ -2813,7 +2814,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
         used = 0;
         free = 0;
       }
-      
+
       info.add(new VolumeInfo(volume, used, free));
     }
     return info;
@@ -2863,7 +2864,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       }
     }
   }
-  
+
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
@@ -3237,7 +3238,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       shouldRun = false;
     }
   }
-  
+
   @Override
   public void setPinning(ExtendedBlock block) throws IOException {
     if (!blockPinningEnabled) {
@@ -3254,7 +3255,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
         oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
     localFS.setPermission(p, permission);
   }
-  
+
   @Override
   public boolean getPinning(ExtendedBlock block) throws IOException {
     if (!blockPinningEnabled) {
@@ -3265,7 +3266,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
     return fss.getPermission().getStickyBit();
   }
-  
+
   @Override
   public boolean isDeletingBlock(String bpid, long blockId) {
     synchronized(deletingBlock) {
@@ -3289,7 +3290,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       }
     }
   }
-  
+
   private void addDeletingBlock(String bpid, Long blockId) {
     synchronized(deletingBlock) {
       Set<Long> s = deletingBlock.get(bpid);
@@ -3361,4 +3362,3 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index bc822c6..b1ce931 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -901,13 +901,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return cacheExecutor;
   }
 
-  void checkDirs() throws DiskErrorException {
-    // TODO:FEDERATION valid synchronization
-    for(BlockPoolSlice s : bpSlices.values()) {
-      s.checkDirs();
-    }
-  }
-
   @Override
   public VolumeCheckResult check(VolumeCheckContext ignored)
       throws DiskErrorException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 9ecb901..8f84bf2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
 class FsVolumeList {
@@ -96,11 +94,11 @@ class FsVolumeList {
     }
   }
 
-  /** 
+  /**
    * Get next volume.
    *
    * @param blockSize free space needed on the volume
-   * @param storageType the desired {@link StorageType} 
+   * @param storageType the desired {@link StorageType}
    * @return next volume to store the block in.
    */
   FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
@@ -167,7 +165,7 @@ class FsVolumeList {
     }
     return capacity;
   }
-    
+
   long getRemaining() throws IOException {
     long remaining = 0L;
     for (FsVolumeSpi vol : volumes) {
@@ -179,7 +177,7 @@ class FsVolumeList {
     }
     return remaining;
   }
-  
+
   void getAllVolumesMap(final String bpid,
                         final ReplicaMap volumeMap,
                         final RamDiskReplicaTracker ramDiskReplicaMap)
@@ -229,28 +227,17 @@ class FsVolumeList {
 
   /**
    * Calls {@link FsVolumeImpl#checkDirs()} on each volume.
-   * 
+   *
    * Use {@link checkDirsLock} to allow only one instance of checkDirs() call.
    *
    * @return list of all the failed volumes.
+   * @param failedVolumes
    */
-  Set<File> checkDirs() {
+  void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
     try (AutoCloseableLock lock = checkDirsLock.acquire()) {
-      Set<File> failedVols = null;
-      
-      // Make a copy of volumes for performing modification 
-      final List<FsVolumeImpl> volumeList = getVolumes();
-
-      for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
-        final FsVolumeImpl fsv = i.next();
+      for(FsVolumeSpi vol : failedVolumes) {
+        FsVolumeImpl fsv = (FsVolumeImpl) vol;
         try (FsVolumeReference ref = fsv.obtainReference()) {
-          fsv.checkDirs();
-        } catch (DiskErrorException e) {
-          FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
-          if (failedVols == null) {
-            failedVols = new HashSet<>(1);
-          }
-          failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
           addVolumeFailureInfo(fsv);
           removeVolume(fsv);
         } catch (ClosedChannelException e) {
@@ -260,14 +247,8 @@ class FsVolumeList {
           FsDatasetImpl.LOG.error("Unexpected IOException", e);
         }
       }
-      
-      if (failedVols != null && failedVols.size() > 0) {
-        FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + 
failedVols.size()
-            + " failure volumes.");
-      }
 
       waitVolumeRemoved(5000, checkDirsLockCondition);
-      return failedVols;
     }
   }
 
@@ -405,7 +386,7 @@ class FsVolumeList {
 
   void addBlockPool(final String bpid, final Configuration conf) throws 
IOException {
     long totalStartTime = Time.monotonicNow();
-    
+
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
@@ -442,12 +423,12 @@ class FsVolumeList {
     if (!exceptions.isEmpty()) {
       throw exceptions.get(0);
     }
-    
+
     long totalTimeTaken = Time.monotonicNow() - totalStartTime;
     FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
         bpid + ": " + totalTimeTaken + "ms");
   }
-  
+
   void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
       blocksPerVolume) {
     for (FsVolumeImpl v : volumes) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec80de3c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
index 9e3112e..570776e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
@@ -30,9 +30,11 @@ import java.net.URL;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -52,6 +54,8 @@ public class TestBlockStatsMXBean {
   @Before
   public void setup() throws IOException {
     HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = null;
     StorageType[][] types = new StorageType[6][];
     for (int i=0; i<3; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to