HDFS-11340. DataNode reconfigure for disks doesn't remove the failed volumes. 
(Manoj Govindassamy via lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6d356b6b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6d356b6b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6d356b6b

Branch: refs/heads/YARN-5734
Commit: 6d356b6b4d8ccb32397cacfb5d0357b21f6035fc
Parents: 9649c27
Author: Lei Xu <l...@apache.org>
Authored: Fri Mar 10 14:36:51 2017 -0800
Committer: Lei Xu <l...@apache.org>
Committed: Fri Mar 10 14:37:13 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/server/datanode/DataNode.java   |  73 +++++++++---
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  14 ++-
 .../datanode/fsdataset/impl/FsVolumeList.java   |  13 ++-
 .../TestDataNodeVolumeFailureReporting.java     | 116 +++++++++++++++++--
 4 files changed, 184 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d356b6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 6f24858..5a82850 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -652,48 +652,84 @@ public class DataNode extends ReconfigurableBase
   ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
     Configuration conf = new Configuration();
     conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
-    List<StorageLocation> locations = getStorageLocations(conf);
+    List<StorageLocation> newStorageLocations = getStorageLocations(conf);
 
-    if (locations.isEmpty()) {
+    if (newStorageLocations.isEmpty()) {
       throw new IOException("No directory is specified.");
     }
 
-    // Use the existing StorageLocation to detect storage type changes.
-    Map<String, StorageLocation> existingLocations = new HashMap<>();
+    // Use the existing storage locations from the current conf
+    // to detect new storage additions or removals.
+    Map<String, StorageLocation> existingStorageLocations = new HashMap<>();
     for (StorageLocation loc : getStorageLocations(getConf())) {
-      existingLocations.put(loc.getNormalizedUri().toString(), loc);
+      existingStorageLocations.put(loc.getNormalizedUri().toString(), loc);
     }
 
     ChangedVolumes results = new ChangedVolumes();
-    results.newLocations.addAll(locations);
+    results.newLocations.addAll(newStorageLocations);
 
     for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
          it.hasNext(); ) {
       Storage.StorageDirectory dir = it.next();
       boolean found = false;
-      for (Iterator<StorageLocation> sl = results.newLocations.iterator();
-           sl.hasNext(); ) {
-        StorageLocation location = sl.next();
-        if (location.matchesStorageDirectory(dir)) {
-          sl.remove();
-          StorageLocation old = existingLocations.get(
-              location.getNormalizedUri().toString());
-          if (old != null &&
-              old.getStorageType() != location.getStorageType()) {
+      for (Iterator<StorageLocation> newLocationItr =
+           results.newLocations.iterator(); newLocationItr.hasNext();) {
+        StorageLocation newLocation = newLocationItr.next();
+        if (newLocation.matchesStorageDirectory(dir)) {
+          StorageLocation oldLocation = existingStorageLocations.get(
+              newLocation.getNormalizedUri().toString());
+          if (oldLocation != null &&
+              oldLocation.getStorageType() != newLocation.getStorageType()) {
             throw new IOException("Changing storage type is not allowed.");
           }
-          results.unchangedLocations.add(location);
+          // Update the unchanged locations as this location
+          // from the new conf is really not a new one.
+          newLocationItr.remove();
+          results.unchangedLocations.add(newLocation);
           found = true;
           break;
         }
       }
 
+      // New conf doesn't have the storage location which available in
+      // the current storage locations. Add to the deactivateLocations list.
       if (!found) {
+        LOG.info("Deactivation request received for active volume: "
+            + dir.getRoot().toString());
         results.deactivateLocations.add(
             StorageLocation.parse(dir.getRoot().toString()));
       }
     }
 
+    // Use the failed storage locations from the current conf
+    // to detect removals in the new conf.
+    if (getFSDataset().getNumFailedVolumes() > 0) {
+      for (String failedStorageLocation : getFSDataset()
+          .getVolumeFailureSummary().getFailedStorageLocations()) {
+        boolean found = false;
+        for (Iterator<StorageLocation> newLocationItr =
+             results.newLocations.iterator(); newLocationItr.hasNext();) {
+          StorageLocation newLocation = newLocationItr.next();
+          if (newLocation.getNormalizedUri().toString().equals(
+              failedStorageLocation)) {
+            // The failed storage is being re-added. DataNode#refreshVolumes()
+            // will take care of re-assessing it.
+            found = true;
+            break;
+          }
+        }
+
+        // New conf doesn't have this failed storage location.
+        // Add to the deactivate locations list.
+        if (!found) {
+          LOG.info("Deactivation request received for failed volume: "
+              + failedStorageLocation);
+          results.deactivateLocations.add(StorageLocation.parse(
+              failedStorageLocation));
+        }
+      }
+    }
+
     return results;
   }
 
@@ -716,8 +752,9 @@ public class DataNode extends ReconfigurableBase
     }
 
     try {
-      if (numOldDataDirs + changedVolumes.newLocations.size() -
-          changedVolumes.deactivateLocations.size() <= 0) {
+      if (numOldDataDirs + getFSDataset().getNumFailedVolumes()
+          + changedVolumes.newLocations.size()
+          - changedVolumes.deactivateLocations.size() <= 0) {
         throw new IOException("Attempt to remove all volumes.");
       }
       if (!changedVolumes.newLocations.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d356b6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index aff19ce..169e0e6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -502,8 +502,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public void removeVolumes(
-      Collection<StorageLocation> storageLocationsToRemove,
+      final Collection<StorageLocation> storageLocsToRemove,
       boolean clearFailure) {
+    Collection<StorageLocation> storageLocationsToRemove =
+        new ArrayList<>(storageLocsToRemove);
     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
     List<String> storageToRemove = new ArrayList<>();
     try (AutoCloseableLock lock = datasetLock.acquire()) {
@@ -541,6 +543,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           }
 
           storageToRemove.add(sd.getStorageUuid());
+          storageLocationsToRemove.remove(sdLocation);
+        }
+      }
+
+      // A reconfigure can remove the storage location which is already
+      // removed when the failure was detected by DataNode#checkDiskErrorAsync.
+      // Now, lets remove this from the failed volume list.
+      if (clearFailure) {
+        for (StorageLocation storageLocToRemove : storageLocationsToRemove) {
+          volumes.removeVolumeFailureInfo(storageLocToRemove);
         }
       }
       setupAsyncLazyPersistThreads();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d356b6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 64921d7..e7f0228 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -371,8 +371,15 @@ class FsVolumeList {
   }
 
   void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
-    volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
-        volumeFailureInfo);
+    // There could be redundant requests for adding the same failed
+    // volume because of repeated DataNode reconfigure with same list
+    // of volumes. Ignoring update on failed volume so as to preserve
+    // old failed capacity details in the map.
+    if (!volumeFailureInfos.containsKey(volumeFailureInfo
+        .getFailedStorageLocation())) {
+      volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
+          volumeFailureInfo);
+    }
   }
 
   private void addVolumeFailureInfo(FsVolumeImpl vol) {
@@ -382,7 +389,7 @@ class FsVolumeList {
         vol.getCapacity()));
   }
 
-  private void removeVolumeFailureInfo(StorageLocation location) {
+  void removeVolumeFailureInfo(StorageLocation location) {
     volumeFailureInfos.remove(location);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d356b6b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index fbbc7f9..a3850ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -25,16 +25,22 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.lang.management.ManagementFactory;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -51,6 +57,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -406,8 +413,8 @@ public class TestDataNodeVolumeFailureReporting {
     DataNodeTestUtils.triggerHeartbeat(dns.get(0));
     DataNodeTestUtils.triggerHeartbeat(dns.get(1));
 
-    checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
 
     // Ensure we wait a sufficient amount of time.
     assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@@ -415,9 +422,9 @@ public class TestDataNodeVolumeFailureReporting {
     // The NN reports two volume failures again.
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
-    checkAggregateFailuresAtNameNode(false, 2);
-    checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
 
     // Reconfigure a third time with the failed volumes.  Afterwards, we expect
     // the same volume failures to be reported.  (No double-counting.)
@@ -427,8 +434,8 @@ public class TestDataNodeVolumeFailureReporting {
     DataNodeTestUtils.triggerHeartbeat(dns.get(0));
     DataNodeTestUtils.triggerHeartbeat(dns.get(1));
 
-    checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
 
     // Ensure we wait a sufficient amount of time.
     assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@@ -436,9 +443,9 @@ public class TestDataNodeVolumeFailureReporting {
     // The NN reports two volume failures again.
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
-    checkAggregateFailuresAtNameNode(false, 2);
-    checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
 
     // Replace failed volume with healthy volume and run reconfigure DataNode.
     // The failed volume information should be cleared.
@@ -515,6 +522,95 @@ public class TestDataNodeVolumeFailureReporting {
   }
 
   /**
+   * Verify DataNode NumFailedVolumes and FailedStorageLocations
+   * after hot swap out of failed volume.
+   */
+  @Test
+  public void testHotSwapOutFailedVolumeAndReporting()
+          throws Exception {
+    final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
+    final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
+    final DataNode dn0 = cluster.getDataNodes().get(0);
+    final String oldDataDirs = dn0.getConf().get(
+            DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName mxbeanName = new ObjectName(
+        "Hadoop:service=DataNode,name=FSDatasetState-" + 
dn0.getDatanodeUuid());
+    int numFailedVolumes = (int) mbs.getAttribute(mxbeanName,
+        "NumFailedVolumes");
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 0, false, new String[] {});
+
+    // Fail dn0Vol1 first.
+    // Verify NumFailedVolumes and FailedStorageLocations are empty.
+    DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
+    DataNodeTestUtils.waitForDiskError(dn0,
+        DataNodeTestUtils.getVolume(dn0, dn0Vol1));
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(1, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+            numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 1, true,
+        new String[] {dn0Vol1.getAbsolutePath()});
+
+    // Reconfigure disks without fixing the failed disk.
+    // Verify NumFailedVolumes and FailedStorageLocations haven't changed.
+    try {
+      dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+          oldDataDirs);
+      fail("Reconfigure with failed disk should throw exception.");
+    } catch (ReconfigurationException e) {
+      Assert.assertTrue("Reconfigure exception doesn't have expected path!",
+          e.getCause().getMessage().contains(dn0Vol1.getAbsolutePath()));
+    }
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(1, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 1, true,
+        new String[] {dn0Vol1.getAbsolutePath()});
+
+    // Hot swap out the failed volume.
+    // Verify NumFailedVolumes and FailedStorageLocations are reset.
+    String dataDirs = dn0Vol2.getPath();
+    dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+            dataDirs);
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(0, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+            numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 0, true, new String[] {});
+
+    // Fix failure volume dn0Vol1 and remount it back.
+    // Verify NumFailedVolumes and FailedStorageLocations are empty.
+    DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1);
+    dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+            oldDataDirs);
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(0, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 0, true, new String[] {});
+
+    // Fail dn0Vol2.
+    // Verify NumFailedVolumes and FailedStorageLocations are updated.
+    DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
+    DataNodeTestUtils.waitForDiskError(dn0,
+        DataNodeTestUtils.getVolume(dn0, dn0Vol2));
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(1, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 1, true,
+        new String[] {dn0Vol2.getAbsolutePath()});
+
+    // Verify DataNode tolerating one disk failure.
+    assertTrue(dn0.shouldRun());
+  }
+
+  /**
    * Checks the NameNode for correct values of aggregate counters tracking 
failed
    * volumes across all DataNodes.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to