HDFS-6727. Refresh data volumes on DataNode based on configuration changes (Lei 
Xu via Colin Patrick McCabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe38d2e9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe38d2e9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe38d2e9

Branch: refs/heads/HDFS-6581
Commit: fe38d2e9b5ac7e13f97cd2d3d2a984ab6bbaaf77
Parents: 5d01a68
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Thu Sep 18 16:46:01 2014 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Thu Sep 18 16:52:46 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/common/HdfsServerConstants.java |   7 +-
 .../hadoop/hdfs/server/common/Storage.java      |  10 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   | 161 ++++++-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   4 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 141 +++++--
 .../server/datanode/SimulatedFSDataset.java     |   3 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    | 423 +++++++++++++++++++
 .../fsdataset/impl/TestFsDatasetImpl.java       |  12 +-
 9 files changed, 725 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fc22282..c99207b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -559,6 +559,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7003. Add NFS Gateway support for reading and writing to
     encryption zones. (clamb via wang)
 
+    HDFS-6727. Refresh data volumes on DataNode based on configuration changes
+    (Lei Xu via cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 106f489..767c1b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -94,7 +94,12 @@ public final class HdfsServerConstants {
     NONINTERACTIVE("-nonInteractive"),
     RENAMERESERVED("-renameReserved"),
     METADATAVERSION("-metadataVersion"),
-    UPGRADEONLY("-upgradeOnly");
+    UPGRADEONLY("-upgradeOnly"),
+    // The -hotswap constant should not be used as a startup option, it is
+    // only used for StorageDirectory.analyzeStorage() in hot swap drive 
scenario.
+    // TODO refactor StorageDirectory.analyzeStorage() so that we can do away 
with
+    // this in StartupOption.
+    HOTSWAP("-hotswap");
 
     private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = 
Pattern.compile(
         "(\\w+)\\((\\w+)\\)");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 1d68727..7933fed 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -464,17 +464,20 @@ public abstract class Storage extends StorageInfo {
     public StorageState analyzeStorage(StartupOption startOpt, Storage storage)
         throws IOException {
       assert root != null : "root is null";
+      boolean hadMkdirs = false;
       String rootPath = root.getCanonicalPath();
       try { // check that storage exists
         if (!root.exists()) {
           // storage directory does not exist
-          if (startOpt != StartupOption.FORMAT) {
+          if (startOpt != StartupOption.FORMAT &&
+              startOpt != StartupOption.HOTSWAP) {
             LOG.warn("Storage directory " + rootPath + " does not exist");
             return StorageState.NON_EXISTENT;
           }
           LOG.info(rootPath + " does not exist. Creating ...");
           if (!root.mkdirs())
             throw new IOException("Cannot create directory " + rootPath);
+          hadMkdirs = true;
         }
         // or is inaccessible
         if (!root.isDirectory()) {
@@ -492,7 +495,10 @@ public abstract class Storage extends StorageInfo {
 
       this.lock(); // lock storage if it exists
 
-      if (startOpt == HdfsServerConstants.StartupOption.FORMAT)
+      // If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory,
+      // while it also checks the layout version.
+      if (startOpt == HdfsServerConstants.StartupOption.FORMAT ||
+          (startOpt == StartupOption.HOTSWAP && hadMkdirs))
         return StorageState.NOT_FORMATTED;
 
       if (startOpt != HdfsServerConstants.StartupOption.IMPORT) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 9b57262..57ff2fa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -70,8 +70,10 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -80,11 +82,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.ReconfigurableBase;
+import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -137,6 +141,7 @@ import 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import 
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -220,7 +225,7 @@ import com.google.protobuf.BlockingService;
  *
  **********************************************************/
 @InterfaceAudience.Private
-public class DataNode extends Configured 
+public class DataNode extends ReconfigurableBase
     implements InterDatanodeProtocol, ClientDatanodeProtocol,
     DataNodeMXBean {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
@@ -305,6 +310,7 @@ public class DataNode extends Configured
   private JvmPauseMonitor pauseMonitor;
 
   private SecureResources secureResources = null;
+  // dataDirs must be accessed while holding the DataNode lock.
   private List<StorageLocation> dataDirs;
   private Configuration conf;
   private final String confVersion;
@@ -386,6 +392,149 @@ public class DataNode extends Configured
     }
   }
 
+  @Override
+  public void reconfigurePropertyImpl(String property, String newVal)
+      throws ReconfigurationException {
+    if (property.equals(DFS_DATANODE_DATA_DIR_KEY)) {
+      try {
+        LOG.info("Reconfiguring " + property + " to " + newVal);
+        this.refreshVolumes(newVal);
+      } catch (Exception e) {
+        throw new ReconfigurationException(property, newVal,
+            getConf().get(property), e);
+      }
+    } else {
+      throw new ReconfigurationException(
+          property, newVal, getConf().get(property));
+    }
+  }
+
+  /**
+   * Get a list of the keys of the re-configurable properties in configuration.
+   */
+  @Override
+  public Collection<String> getReconfigurableProperties() {
+    List<String> reconfigurable =
+        Collections.unmodifiableList(Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
+    return reconfigurable;
+  }
+
+  /**
+   * Contains the StorageLocations for changed data volumes.
+   */
+  @VisibleForTesting
+  static class ChangedVolumes {
+    List<StorageLocation> newLocations = Lists.newArrayList();
+    List<StorageLocation> deactivateLocations = Lists.newArrayList();
+  }
+
+  /**
+   * Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect
+   * changed volumes.
+   * @return changed volumes.
+   * @throws IOException if none of the directories are specified in the
+   * configuration.
+   */
+  @VisibleForTesting
+  ChangedVolumes parseChangedVolumes() throws IOException {
+    List<StorageLocation> locations = getStorageLocations(getConf());
+
+    if (locations.isEmpty()) {
+      throw new IOException("No directory is specified.");
+    }
+
+    ChangedVolumes results = new ChangedVolumes();
+    results.newLocations.addAll(locations);
+
+    for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
+         it.hasNext(); ) {
+      Storage.StorageDirectory dir = it.next();
+      boolean found = false;
+      for (Iterator<StorageLocation> sl = results.newLocations.iterator();
+           sl.hasNext(); ) {
+        if (sl.next().getFile().getCanonicalPath().equals(
+            dir.getRoot().getCanonicalPath())) {
+          sl.remove();
+          found = true;
+          break;
+        }
+      }
+
+      if (!found) {
+        results.deactivateLocations.add(
+            StorageLocation.parse(dir.getRoot().toString()));
+      }
+    }
+
+    return results;
+  }
+
+  /**
+   * Attempts to reload data volumes with new configuration.
+   * @param newVolumes a comma separated string that specifies the data 
volumes.
+   * @throws Exception
+   */
+  private synchronized void refreshVolumes(String newVolumes) throws Exception 
{
+    Configuration conf = getConf();
+    String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY);
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
+    List<StorageLocation> locations = getStorageLocations(conf);
+
+    final int numOldDataDirs = dataDirs.size();
+    dataDirs = locations;
+    ChangedVolumes changedVolumes = parseChangedVolumes();
+
+    try {
+      if (numOldDataDirs + changedVolumes.newLocations.size() -
+          changedVolumes.deactivateLocations.size() <= 0) {
+        throw new IOException("Attempt to remove all volumes.");
+      }
+      if (!changedVolumes.newLocations.isEmpty()) {
+        LOG.info("Adding new volumes: " +
+            Joiner.on(",").join(changedVolumes.newLocations));
+
+        // Add volumes for each Namespace
+        for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
+          NamespaceInfo nsInfo = bpos.getNamespaceInfo();
+          LOG.info("Loading volumes for namesapce: " + 
nsInfo.getNamespaceID());
+          storage.addStorageLocations(
+              this, nsInfo, changedVolumes.newLocations, 
StartupOption.HOTSWAP);
+        }
+        List<String> bpids = Lists.newArrayList();
+        for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
+          bpids.add(bpos.getBlockPoolId());
+        }
+        List<StorageLocation> succeedVolumes =
+            data.addVolumes(changedVolumes.newLocations, bpids);
+
+        if (succeedVolumes.size() < changedVolumes.newLocations.size()) {
+          List<StorageLocation> failedVolumes = Lists.newArrayList();
+          // Clean all failed volumes.
+          for (StorageLocation location : changedVolumes.newLocations) {
+            if (!succeedVolumes.contains(location)) {
+              failedVolumes.add(location);
+            }
+          }
+          storage.removeVolumes(failedVolumes);
+          data.removeVolumes(failedVolumes);
+        }
+      }
+
+      if (!changedVolumes.deactivateLocations.isEmpty()) {
+        LOG.info("Deactivating volumes: " +
+            Joiner.on(",").join(changedVolumes.deactivateLocations));
+
+        data.removeVolumes(changedVolumes.deactivateLocations);
+        storage.removeVolumes(changedVolumes.deactivateLocations);
+      }
+    } catch (IOException e) {
+      LOG.warn("There is IOException when refreshing volumes! "
+          + "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY
+          + " = " + oldVolumes, e);
+      throw e;
+    }
+  }
+
   private synchronized void setClusterId(final String nsCid, final String bpid
       ) throws IOException {
     if(clusterId != null && !clusterId.equals(nsCid)) {
@@ -822,7 +971,9 @@ public class DataNode extends Configured
 
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
-    this.dataDirs = dataDirs;
+    synchronized (this) {
+      this.dataDirs = dataDirs;
+    }
     this.conf = conf;
     this.dnConf = new DNConf(conf);
     this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
@@ -1112,7 +1263,9 @@ public class DataNode extends Configured
       }
       final String bpid = nsInfo.getBlockPoolID();
       //read storage info, lock data dirs and transition fs state if necessary
-      storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
+      synchronized (this) {
+        storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
+      }
       final StorageInfo bpStorage = storage.getBPStorage(bpid);
       LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
           + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 553208e..4c03151 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -94,8 +94,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends 
FSDatasetMBean {
   public List<V> getVolumes();
 
   /** Add an array of StorageLocation to FsDataset. */
-  public void addVolumes(Collection<StorageLocation> volumes)
-      throws IOException;
+  public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
+      final Collection<String> bpids);
 
   /** Removes a collection of volumes from FsDataset. */
   public void removeVolumes(Collection<StorageLocation> volumes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 05c9914..d28d616 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -28,19 +28,23 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,6 +89,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
@@ -245,7 +250,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + ", volume failures tolerated: " + volFailuresTolerated);
     }
 
-    storageMap = new HashMap<String, DatanodeStorage>();
+    storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
     volumeMap = new ReplicaMap(this);
     @SuppressWarnings("unchecked")
     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
@@ -275,45 +280,124 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     // storageMap and asyncDiskService, consistent.
     FsVolumeImpl fsVolume = new FsVolumeImpl(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
-    fsVolume.getVolumeMap(volumeMap);
+    ReplicaMap tempVolumeMap = new ReplicaMap(this);
+    fsVolume.getVolumeMap(tempVolumeMap);
 
+    volumeMap.addAll(tempVolumeMap);
     volumes.addVolume(fsVolume);
     storageMap.put(sd.getStorageUuid(),
         new DatanodeStorage(sd.getStorageUuid(),
-                            DatanodeStorage.State.NORMAL,
-                            storageType));
+            DatanodeStorage.State.NORMAL,
+            storageType));
     asyncDiskService.addVolume(sd.getCurrentDir());
 
     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
   }
 
+  private void addVolumeAndBlockPool(Collection<StorageLocation> dataLocations,
+      Storage.StorageDirectory sd, final Collection<String> bpids)
+      throws IOException {
+    final File dir = sd.getCurrentDir();
+    final StorageType storageType =
+        getStorageTypeFromLocations(dataLocations, sd.getRoot());
+
+    final FsVolumeImpl fsVolume = new FsVolumeImpl(
+        this, sd.getStorageUuid(), dir, this.conf, storageType);
+    final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
+
+    List<IOException> exceptions = Lists.newArrayList();
+    for (final String bpid : bpids) {
+      try {
+        fsVolume.addBlockPool(bpid, this.conf);
+        fsVolume.getVolumeMap(bpid, tempVolumeMap);
+      } catch (IOException e) {
+        LOG.warn("Caught exception when adding " + fsVolume +
+            ". Will throw later.", e);
+        exceptions.add(e);
+      }
+    }
+    if (!exceptions.isEmpty()) {
+      // The states of FsDatasteImpl are not modified, thus no need to rolled 
back.
+      throw MultipleIOException.createIOException(exceptions);
+    }
+
+    volumeMap.addAll(tempVolumeMap);
+    storageMap.put(sd.getStorageUuid(),
+        new DatanodeStorage(sd.getStorageUuid(),
+            DatanodeStorage.State.NORMAL,
+            storageType));
+    asyncDiskService.addVolume(sd.getCurrentDir());
+    volumes.addVolume(fsVolume);
+
+    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+  }
+
   /**
    * Add an array of StorageLocation to FsDataset.
    *
    * @pre dataStorage must have these volumes.
-   * @param volumes
-   * @throws IOException
+   * @param volumes an array of storage locations for adding volumes.
+   * @param bpids block pool IDs.
+   * @return an array of successfully loaded volumes.
    */
   @Override
-  public synchronized void addVolumes(Collection<StorageLocation> volumes)
-      throws IOException {
+  public synchronized List<StorageLocation> addVolumes(
+      final List<StorageLocation> volumes, final Collection<String> bpids) {
     final Collection<StorageLocation> dataLocations =
         DataNode.getStorageLocations(this.conf);
-    Map<String, Storage.StorageDirectory> allStorageDirs =
+    final Map<String, Storage.StorageDirectory> allStorageDirs =
         new HashMap<String, Storage.StorageDirectory>();
-    for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
-      Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
-      allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
+    List<StorageLocation> succeedVolumes = Lists.newArrayList();
+    try {
+      for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+        Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+        allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd);
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Caught exception when parsing storage URL.", ioe);
+      return succeedVolumes;
+    }
+
+    final boolean[] successFlags = new boolean[volumes.size()];
+    Arrays.fill(successFlags, false);
+    List<Thread> volumeAddingThreads = Lists.newArrayList();
+    for (int i = 0; i < volumes.size(); i++) {
+      final int idx = i;
+      Thread t = new Thread() {
+        public void run() {
+          StorageLocation vol = volumes.get(idx);
+          try {
+            String key = vol.getFile().getCanonicalPath();
+            if (!allStorageDirs.containsKey(key)) {
+              LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
+            } else {
+              addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key),
+                  bpids);
+              successFlags[idx] = true;
+            }
+          } catch (IOException e) {
+            LOG.warn("Caught exception when adding volume " + vol, e);
+          }
+        }
+      };
+      volumeAddingThreads.add(t);
+      t.start();
     }
 
-    for (StorageLocation vol : volumes) {
-      String key = vol.getFile().getAbsolutePath();
-      if (!allStorageDirs.containsKey(key)) {
-        LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
-      } else {
-        addVolume(dataLocations, allStorageDirs.get(key));
+    for (Thread t : volumeAddingThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Caught InterruptedException when adding volume.", e);
       }
     }
+
+    for (int i = 0; i < volumes.size(); i++) {
+      if (successFlags[i]) {
+        succeedVolumes.add(volumes.get(i));
+      }
+    }
+    return succeedVolumes;
   }
 
   /**
@@ -335,9 +419,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         String volume = sd.getRoot().toString();
         LOG.info("Removing " + volume + " from FsDataset.");
 
-        this.volumes.removeVolume(volume);
-        storageMap.remove(sd.getStorageUuid());
+        // Disable the volume from the service.
         asyncDiskService.removeVolume(sd.getCurrentDir());
+        this.volumes.removeVolume(volume);
 
         // Removed all replica information for the blocks on the volume. Unlike
         // updating the volumeMap in addVolume(), this operation does not scan
@@ -348,7 +432,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               it.hasNext(); ) {
             ReplicaInfo block = it.next();
             if (block.getVolume().getBasePath().equals(volume)) {
-              invalidate(bpid, block.getBlockId());
+              invalidate(bpid, block);
               blocks.add(block);
               it.remove();
             }
@@ -357,6 +441,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           datanode.getBlockScanner().deleteBlocks(bpid,
               blocks.toArray(new Block[blocks.size()]));
         }
+
+        storageMap.remove(sd.getStorageUuid());
       }
     }
   }
@@ -1345,23 +1431,26 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   /**
    * Invalidate a block but does not delete the actual on-disk block file.
    *
-   * It should only be used for decommissioning disks.
+   * It should only be used when deactivating disks.
    *
    * @param bpid the block pool ID.
-   * @param blockId the ID of the block.
+   * @param block The block to be invalidated.
    */
-  public void invalidate(String bpid, long blockId) {
+  public void invalidate(String bpid, ReplicaInfo block) {
     // If a DFSClient has the replica in its cache of short-circuit file
     // descriptors (and the client is using ShortCircuitShm), invalidate it.
     // The short-circuit registry is null in the unit tests, because the
     // datanode is mock object.
     if (datanode.getShortCircuitRegistry() != null) {
       datanode.getShortCircuitRegistry().processBlockInvalidation(
-          new ExtendedBlockId(blockId, bpid));
+          new ExtendedBlockId(block.getBlockId(), bpid));
 
       // If the block is cached, start uncaching it.
-      cacheManager.uncacheBlock(bpid, blockId);
+      cacheManager.uncacheBlock(bpid, block.getBlockId());
     }
+
+    datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
+        block.getStorageUuid());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index d0fad6e..83d93f0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1093,7 +1093,8 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public void addVolumes(Collection<StorageLocation> volumes) {
+  public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
+      final Collection<String> bpids) {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
new file mode 100644
index 0000000..d2b2995
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestDataNodeHotSwapVolumes {
+  private static final int BLOCK_SIZE = 512;
+  private MiniDFSCluster cluster;
+
+  @After
+  public void tearDown() {
+    shutdown();
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes)
+      throws IOException {
+    shutdown();
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+    /*
+     * Lower the DN heartbeat, DF rate, and recheck interval to one second
+     * so state about failures and datanode death propagates faster.
+     */
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        1000);
+
+    MiniDFSNNTopology nnTopology =
+        MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(nnTopology)
+        .numDataNodes(numDataNodes)
+        .build();
+    cluster.waitActive();
+  }
+
+  private void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFile(Path path, int numBlocks)
+      throws IOException, InterruptedException, TimeoutException {
+    final short replicateFactor = 1;
+    createFile(path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(Path path, int numBlocks, short replicateFactor)
+      throws IOException, InterruptedException, TimeoutException {
+    createFile(0, path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(int fsIdx, Path path, int numBlocks)
+      throws IOException, InterruptedException, TimeoutException {
+    final short replicateFactor = 1;
+    createFile(fsIdx, path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(int fsIdx, Path path, int numBlocks,
+      short replicateFactor)
+      throws IOException, TimeoutException, InterruptedException {
+    final int seed = 0;
+    final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
+    DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
+        replicateFactor, seed);
+    DFSTestUtil.waitReplication(fs, path, replicateFactor);
+  }
+
+  /**
+   * Verify whether a file has enough content.
+   */
+  private static void verifyFileLength(FileSystem fs, Path path, int numBlocks)
+      throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    assertEquals(numBlocks * BLOCK_SIZE, status.getLen());
+  }
+
+  /** Return the number of replicas for a given block in the file. */
+  private static int getNumReplicas(FileSystem fs, Path file,
+      int blockIdx) throws IOException {
+    BlockLocation locs[] = fs.getFileBlockLocations(file, 0, Long.MAX_VALUE);
+    return locs.length < blockIdx + 1 ? 0 : locs[blockIdx].getNames().length;
+  }
+
+  /**
+   * Wait the block to have the exact number of replicas as expected.
+   */
+  private static void waitReplication(FileSystem fs, Path file, int blockIdx,
+      int numReplicas)
+      throws IOException, TimeoutException, InterruptedException {
+    int attempts = 50;  // Wait 5 seconds.
+    while (attempts > 0) {
+      if (getNumReplicas(fs, file, blockIdx) == numReplicas) {
+        return;
+      }
+      Thread.sleep(100);
+      attempts--;
+    }
+    throw new TimeoutException("Timed out waiting the " + blockIdx + "-th 
block"
+        + " of " + file + " to have " + numReplicas + " replicas.");
+  }
+
+  /** Parses data dirs from DataNode's configuration. */
+  private static Collection<String> getDataDirs(DataNode datanode) {
+    return datanode.getConf().getTrimmedStringCollection(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+  }
+
+  @Test
+  public void testParseChangedVolumes() throws IOException {
+    startDFSCluster(1, 1);
+    DataNode dn = cluster.getDataNodes().get(0);
+    Configuration conf = dn.getConf();
+
+    String oldPaths = conf.get(DFS_DATANODE_DATA_DIR_KEY);
+    List<StorageLocation> oldLocations = new ArrayList<StorageLocation>();
+    for (String path : oldPaths.split(",")) {
+      oldLocations.add(StorageLocation.parse(path));
+    }
+    assertFalse(oldLocations.isEmpty());
+
+    String newPaths = "/foo/path1,/foo/path2";
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths);
+
+    DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes();
+    List<StorageLocation> newVolumes = changedVolumes.newLocations;
+    assertEquals(2, newVolumes.size());
+    assertEquals("/foo/path1", newVolumes.get(0).getFile().getAbsolutePath());
+    assertEquals("/foo/path2", newVolumes.get(1).getFile().getAbsolutePath());
+
+    List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations;
+    assertEquals(oldLocations.size(), removedVolumes.size());
+    for (int i = 0; i < removedVolumes.size(); i++) {
+      assertEquals(oldLocations.get(i).getFile(),
+          removedVolumes.get(i).getFile());
+    }
+  }
+
+  @Test
+  public void testParseChangedVolumesFailures() throws IOException {
+    startDFSCluster(1, 1);
+    DataNode dn = cluster.getDataNodes().get(0);
+    Configuration conf = dn.getConf();
+    try {
+      conf.set(DFS_DATANODE_DATA_DIR_KEY, "");
+      dn.parseChangedVolumes();
+      fail("Should throw IOException: empty inputs.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("No directory is specified.", 
e);
+    }
+  }
+
+  /** Add volumes to the first DataNode. */
+  private void addVolumes(int numNewVolumes) throws ReconfigurationException {
+    File dataDir = new File(cluster.getDataDirectory());
+    DataNode dn = cluster.getDataNodes().get(0);  // First DataNode.
+    Configuration conf = dn.getConf();
+    String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY);
+
+    List<File> newVolumeDirs = new ArrayList<File>();
+    StringBuilder newDataDirBuf = new StringBuilder(oldDataDir);
+    int startIdx = oldDataDir.split(",").length + 1;
+    // Find the first available (non-taken) directory name for data volume.
+    while (true) {
+      File volumeDir = new File(dataDir, "data" + startIdx);
+      if (!volumeDir.exists()) {
+        break;
+      }
+      startIdx++;
+    }
+    for (int i = startIdx; i < startIdx + numNewVolumes; i++) {
+      File volumeDir = new File(dataDir, "data" + String.valueOf(i));
+      newVolumeDirs.add(volumeDir);
+      volumeDir.mkdirs();
+      newDataDirBuf.append(",");
+      newDataDirBuf.append(volumeDir.toURI());
+    }
+
+    String newDataDir = newDataDirBuf.toString();
+    dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir);
+    assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY));
+
+    // Check that all newly created volumes are appropriately formatted.
+    for (File volumeDir : newVolumeDirs) {
+      File curDir = new File(volumeDir, "current");
+      assertTrue(curDir.exists());
+      assertTrue(curDir.isDirectory());
+    }
+  }
+
+  private List<List<Integer>> getNumBlocksReport(int namesystemIdx) {
+    List<List<Integer>> results = new ArrayList<List<Integer>>();
+    final String bpid = cluster.getNamesystem(namesystemIdx).getBlockPoolId();
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    for (Map<DatanodeStorage, BlockListAsLongs> datanodeReport : blockReports) 
{
+      List<Integer> numBlocksPerDN = new ArrayList<Integer>();
+      for (BlockListAsLongs blocks : datanodeReport.values()) {
+        numBlocksPerDN.add(blocks.getNumberOfBlocks());
+      }
+      results.add(numBlocksPerDN);
+    }
+    return results;
+  }
+
+  /**
+   * Test adding one volume on a running MiniDFSCluster with only one NameNode.
+   */
+  @Test
+  public void testAddOneNewVolume()
+      throws IOException, ReconfigurationException,
+      InterruptedException, TimeoutException {
+    startDFSCluster(1, 1);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    final int numBlocks = 10;
+
+    addVolumes(1);
+
+    Path testFile = new Path("/test");
+    createFile(testFile, numBlocks);
+
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    assertEquals(1, blockReports.size());  // 1 DataNode
+    assertEquals(3, blockReports.get(0).size());  // 3 volumes
+
+    // FSVolumeList uses Round-Robin block chooser by default. Thus the new
+    // blocks should be evenly located in all volumes.
+    int minNumBlocks = Integer.MAX_VALUE;
+    int maxNumBlocks = Integer.MIN_VALUE;
+    for (BlockListAsLongs blockList : blockReports.get(0).values()) {
+      minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
+      maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
+    }
+    assertTrue(Math.abs(maxNumBlocks - maxNumBlocks) <= 1);
+    verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
+  }
+
+  @Test(timeout = 60000)
+  public void testAddVolumesDuringWrite()
+      throws IOException, InterruptedException, TimeoutException,
+      ReconfigurationException {
+    startDFSCluster(1, 1);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    Path testFile = new Path("/test");
+    createFile(testFile, 4);  // Each volume has 2 blocks.
+
+    addVolumes(2);
+
+    // Continue to write the same file, thus the new volumes will have blocks.
+    DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8);
+    verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4);
+    // After appending data, there should be [2, 2, 4, 4] blocks in each volume
+    // respectively.
+    List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
+
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    assertEquals(1, blockReports.size());  // 1 DataNode
+    assertEquals(4, blockReports.get(0).size());  // 4 volumes
+    Map<DatanodeStorage, BlockListAsLongs> dnReport =
+        blockReports.get(0);
+    List<Integer> actualNumBlocks = new ArrayList<Integer>();
+    for (BlockListAsLongs blockList : dnReport.values()) {
+      actualNumBlocks.add(blockList.getNumberOfBlocks());
+    }
+    Collections.sort(actualNumBlocks);
+    assertEquals(expectedNumBlocks, actualNumBlocks);
+  }
+
+  @Test
+  public void testAddVolumesToFederationNN()
+      throws IOException, TimeoutException, InterruptedException,
+      ReconfigurationException {
+    // Starts a Cluster with 2 NameNode and 3 DataNodes. Each DataNode has 2
+    // volumes.
+    final int numNameNodes = 2;
+    final int numDataNodes = 1;
+    startDFSCluster(numNameNodes, numDataNodes);
+    Path testFile = new Path("/test");
+    // Create a file on the first namespace with 4 blocks.
+    createFile(0, testFile, 4);
+    // Create a file on the second namespace with 4 blocks.
+    createFile(1, testFile, 4);
+
+    // Add 2 volumes to the first DataNode.
+    final int numNewVolumes = 2;
+    addVolumes(numNewVolumes);
+
+    // Append to the file on the first namespace.
+    DFSTestUtil.appendFile(cluster.getFileSystem(0), testFile, BLOCK_SIZE * 8);
+
+    List<List<Integer>> actualNumBlocks = getNumBlocksReport(0);
+    assertEquals(cluster.getDataNodes().size(), actualNumBlocks.size());
+    List<Integer> blocksOnFirstDN = actualNumBlocks.get(0);
+    Collections.sort(blocksOnFirstDN);
+    assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN);
+
+    // Verify the second namespace also has the new volumes and they are empty.
+    actualNumBlocks = getNumBlocksReport(1);
+    assertEquals(4, actualNumBlocks.get(0).size());
+    assertEquals(numNewVolumes,
+        Collections.frequency(actualNumBlocks.get(0), 0));
+  }
+
+  @Test
+  public void testRemoveOneVolume()
+      throws ReconfigurationException, InterruptedException, TimeoutException,
+      IOException {
+    startDFSCluster(1, 1);
+    final short replFactor = 1;
+    Path testFile = new Path("/test");
+    createFile(testFile, 10, replFactor);
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    Collection<String> oldDirs = getDataDirs(dn);
+    String newDirs = oldDirs.iterator().next();  // Keep the first volume.
+    dn.reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+    dn.scheduleAllBlockReport(0);
+
+    try {
+      DFSTestUtil.readFile(cluster.getFileSystem(), testFile);
+      fail("Expect to throw BlockMissingException.");
+    } catch (BlockMissingException e) {
+      GenericTestUtils.assertExceptionContains("Could not obtain block", e);
+    }
+
+    Path newFile = new Path("/newFile");
+    createFile(newFile, 6);
+
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    assertEquals((int)replFactor, blockReports.size());
+
+    BlockListAsLongs blocksForVolume1 =
+        blockReports.get(0).values().iterator().next();
+    // The first volume has half of the testFile and full of newFile.
+    assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks());
+  }
+
+  @Test
+  public void testReplicatingAfterRemoveVolume()
+      throws InterruptedException, TimeoutException, IOException,
+      ReconfigurationException {
+    startDFSCluster(1, 2);
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    final short replFactor = 2;
+    Path testFile = new Path("/test");
+    createFile(testFile, 4, replFactor);
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    Collection<String> oldDirs = getDataDirs(dn);
+    String newDirs = oldDirs.iterator().next();  // Keep the first volume.
+    dn.reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+
+    // Force DataNode to report missing blocks.
+    dn.scheduleAllBlockReport(0);
+    DataNodeTestUtils.triggerDeletionReport(dn);
+
+    // The 2nd block only has 1 replica due to the removed data volume.
+    waitReplication(fs, testFile, 1, 1);
+
+    // Wait NameNode to replica missing blocks.
+    DFSTestUtil.waitReplication(fs, testFile, replFactor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe38d2e9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 2c4c401..10b9f7e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -40,7 +40,10 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -117,6 +120,7 @@ public class TestFsDatasetImpl {
     final int numExistingVolumes = dataset.getVolumes().size();
     final int totalVolumes = numNewVolumes + numExistingVolumes;
     List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
+    Set<String> expectedVolumes = new HashSet<String>();
     for (int i = 0; i < numNewVolumes; i++) {
       String path = BASE_DIR + "/newData" + i;
       newLocations.add(StorageLocation.parse(path));
@@ -125,13 +129,15 @@ public class TestFsDatasetImpl {
     }
     when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
 
-    dataset.addVolumes(newLocations);
+    dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS));
     assertEquals(totalVolumes, dataset.getVolumes().size());
     assertEquals(totalVolumes, dataset.storageMap.size());
+
+    Set<String> actualVolumes = new HashSet<String>();
     for (int i = 0; i < numNewVolumes; i++) {
-      assertEquals(newLocations.get(i).getFile().getPath(),
-          dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
+      dataset.getVolumes().get(numExistingVolumes + i).getBasePath();
     }
+    assertEquals(actualVolumes, expectedVolumes);
   }
 
   @Test

Reply via email to