Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 b9fdbd710 -> 2304501bc


HDFS-10301. Remove FBR tracking state to fix false zombie storage detection for 
interleaving block reports. Contributed by Vinitha Gankidi.

(cherry picked from commit 391ce535a739dc92cb90017d759217265a4fd969)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2304501b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2304501b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2304501b

Branch: refs/heads/branch-2.8
Commit: 2304501bcfcd3747a816e9408854424af6ee46ff
Parents: b9fdbd7
Author: Vinitha Reddy Gankidi <vigank...@linkedin.com>
Authored: Fri Oct 14 10:37:44 2016 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Fri Oct 14 18:52:00 2016 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 76 ++++++--------------
 .../blockmanagement/DatanodeDescriptor.java     | 50 -------------
 .../blockmanagement/DatanodeStorageInfo.java    | 11 ---
 .../hdfs/server/namenode/NameNodeRpcServer.java |  4 +-
 .../blockmanagement/TestBlockManager.java       | 12 ++--
 .../TestNameNodePrunesMissingStorages.java      | 70 +++++++++++++++---
 .../server/datanode/BlockReportTestBase.java    | 50 +++++++++++++
 7 files changed, 145 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2304501b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 39ac94d..8649e48 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -61,7 +61,6 @@ import 
org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -1233,6 +1232,8 @@ public class BlockManager implements BlockStatsMXBean {
       invalidateBlocks.remove(node, block);
     }
     namesystem.checkSafeMode();
+    LOG.info("Removed blocks associated with storage {} from DataNode {}",
+        storageInfo, node);
   }
 
   /**
@@ -1942,8 +1943,8 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage,
-      final BlockListAsLongs newReport, BlockReportContext context,
-      boolean lastStorageInRpc) throws IOException {
+      final BlockListAsLongs newReport,
+      BlockReportContext context) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
@@ -1997,32 +1998,6 @@ public class BlockManager implements BlockStatsMXBean {
       }
       
       storageInfo.receivedBlockReport();
-      if (context != null) {
-        storageInfo.setLastBlockReportId(context.getReportId());
-        if (lastStorageInRpc) {
-          int rpcsSeen = node.updateBlockReportContext(context);
-          if (rpcsSeen >= context.getTotalRpcs()) {
-            long leaseId = blockReportLeaseManager.removeLease(node);
-            BlockManagerFaultInjector.getInstance().
-                removeBlockReportLease(node, leaseId);
-            List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
-            if (zombies.isEmpty()) {
-              LOG.debug("processReport 0x{}: no zombie storages found.",
-                  Long.toHexString(context.getReportId()));
-            } else {
-              for (DatanodeStorageInfo zombie : zombies) {
-                removeZombieReplicas(context, zombie);
-              }
-            }
-            node.clearBlockReportContext();
-          } else {
-            LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
-                    "report.", Long.toHexString(context.getReportId()),
-                (context.getTotalRpcs() - rpcsSeen)
-            );
-          }
-        }
-      }
     } finally {
       endTime = Time.monotonicNow();
       namesystem.writeUnlock();
@@ -2047,30 +2022,25 @@ public class BlockManager implements BlockStatsMXBean {
     return !node.hasStaleStorages();
   }
 
-  private void removeZombieReplicas(BlockReportContext context,
-      DatanodeStorageInfo zombie) {
-    LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
-             "longer exists on the DataNode.",
-              Long.toHexString(context.getReportId()), zombie.getStorageID());
-    assert(namesystem.hasWriteLock());
-    Iterator<BlockInfo> iter = zombie.getBlockIterator();
-    int prevBlocks = zombie.numBlocks();
-    while (iter.hasNext()) {
-      BlockInfo block = iter.next();
-      // We assume that a block can be on only one storage in a DataNode.
-      // That's why we pass in the DatanodeDescriptor rather than the
-      // DatanodeStorageInfo.
-      // TODO: remove this assumption in case we want to put a block on
-      // more than one storage on a datanode (and because it's a difficult
-      // assumption to really enforce)
-      removeStoredBlock(block, zombie.getDatanodeDescriptor());
-      invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
-    }
-    assert(zombie.numBlocks() == 0);
-    LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
-            "which no longer exists on the DataNode.",
-            Long.toHexString(context.getReportId()), prevBlocks,
-            zombie.getStorageID());
+  public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
+      final BlockReportContext context) throws IOException {
+    namesystem.writeLock();
+    DatanodeDescriptor node;
+    try {
+      node = datanodeManager.getDatanode(nodeID);
+      if (context != null) {
+        if (context.getTotalRpcs() == context.getCurRpc() + 1) {
+          long leaseId = this.getBlockReportLeaseManager().removeLease(node);
+          BlockManagerFaultInjector.getInstance().
+              removeBlockReportLease(node, leaseId);
+        }
+        LOG.debug("Processing RPC with index {} out of total {} RPCs in "
+                + "processReport 0x{}", context.getCurRpc(),
+            context.getTotalRpcs(), Long.toHexString(context.getReportId()));
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2304501b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 9e25f53..f2910a9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,7 +30,6 @@ import java.util.Queue;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -41,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -53,8 +50,6 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -67,8 +62,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
       LoggerFactory.getLogger(DatanodeDescriptor.class);
   public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
-  private static final List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
-      ImmutableList.of();
 
   /** Block and targets pair */
   @InterfaceAudience.Private
@@ -153,10 +146,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public final DecommissioningStatus decommissioningStatus =
       new DecommissioningStatus();
 
-  private long curBlockReportId = 0;
-
-  private BitSet curBlockReportRpcsSeen = null;
-
   private final Map<String, DatanodeStorageInfo> storageMap =
       new HashMap<>();
 
@@ -253,20 +242,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
     updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
   }
 
-  public int updateBlockReportContext(BlockReportContext context) {
-    if (curBlockReportId != context.getReportId()) {
-      curBlockReportId = context.getReportId();
-      curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
-    }
-    curBlockReportRpcsSeen.set(context.getCurRpc());
-    return curBlockReportRpcsSeen.cardinality();
-  }
-
-  public void clearBlockReportContext() {
-    curBlockReportId = 0;
-    curBlockReportRpcsSeen = null;
-  }
-
   public CachedBlocksList getPendingCached() {
     return pendingCached;
   }
@@ -330,31 +305,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  List<DatanodeStorageInfo> removeZombieStorages() {
-    List<DatanodeStorageInfo> zombies = null;
-    synchronized (storageMap) {
-      Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
-          storageMap.entrySet().iterator();
-      while (iter.hasNext()) {
-        Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
-        DatanodeStorageInfo storageInfo = entry.getValue();
-        if (storageInfo.getLastBlockReportId() != curBlockReportId) {
-          LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
-              storageInfo.getStorageID(),
-              Long.toHexString(storageInfo.getLastBlockReportId()),
-              Long.toHexString(curBlockReportId));
-          iter.remove();
-          if (zombies == null) {
-            zombies = new LinkedList<>();
-          }
-          zombies.add(storageInfo);
-        }
-        storageInfo.setLastBlockReportId(0);
-      }
-    }
-    return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
-  }
-
   public void resetBlocks() {
     setCapacity(0);
     setRemaining(0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2304501b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 319d2f3..862b1bf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -123,9 +123,6 @@ public class DatanodeStorageInfo {
   private volatile BlockInfo blockList = null;
   private int numBlocks = 0;
 
-  // The ID of the last full block report which updated this storage.
-  private long lastBlockReportId = 0;
-
   /** The number of block reports received */
   private int blockReportCount = 0;
 
@@ -190,14 +187,6 @@ public class DatanodeStorageInfo {
     this.blockPoolUsed = blockPoolUsed;
   }
 
-  long getLastBlockReportId() {
-    return lastBlockReportId;
-  }
-
-  void setLastBlockReportId(long lastBlockReportId) {
-    this.lastBlockReportId = lastBlockReportId;
-  }
-
   State getState() {
     return this.state;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2304501b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 09248e5..6fa5072 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1424,11 +1424,13 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
         @Override
         public Boolean call() throws IOException {
           return bm.processReport(nodeReg, reports[index].getStorage(),
-              blocks, context, (index == reports.length - 1));
+              blocks, context);
         }
       });
       metrics.incrStorageBlockReportOps();
     }
+    bm.removeBRLeaseIfNeeded(nodeReg, context);
+
     BlockManagerFaultInjector.getInstance().
         incomingBlockReportRpc(nodeReg, context);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2304501b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index c0f910c..f4de968 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -713,12 +714,12 @@ public class TestBlockManager {
     reset(node);
     
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null, false);
+        BlockListAsLongs.EMPTY, null);
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null, false);
+        BlockListAsLongs.EMPTY, null);
     assertEquals(1, ds.getBlockReportCount());
 
     // re-register as if node restarted, should update existing node
@@ -729,7 +730,7 @@ public class TestBlockManager {
     // send block report, should be processed after restart
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     BlockListAsLongs.EMPTY, null, false);
+                     BlockListAsLongs.EMPTY, null);
     // Reinitialize as registration with empty storage list pruned
     // node.storageMap.
     ds = node.getStorageInfos()[0];
@@ -758,7 +759,7 @@ public class TestBlockManager {
     reset(node);
     doReturn(1).when(node).numBlocks();
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null, false);
+        BlockListAsLongs.EMPTY, null);
     assertEquals(1, ds.getBlockReportCount());
   }
 
@@ -831,7 +832,8 @@ public class TestBlockManager {
     // Make sure it's the first full report
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        builder.build(), null, false);
+        builder.build(),
+        new BlockReportContext(1, 0, System.nanoTime(), 0));
     assertEquals(1, ds.getBlockReportCount());
 
     // verify the storage info is correct

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2304501b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index cea6865..5214af3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -19,23 +19,22 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import com.google.common.base.Supplier;
+import java.util.ArrayList;
+import java.util.Collection;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -46,7 +45,6 @@ import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -55,13 +53,11 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
-import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.UUID;
 
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
@@ -159,6 +155,8 @@ public class TestNameNodePrunesMissingStorages {
   public void testRemovingStorageDoesNotProduceZombies() throws Exception {
     Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        1000);
     final int NUM_STORAGES_PER_DN = 2;
     final MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(conf).numDataNodes(3)
@@ -257,7 +255,7 @@ public class TestNameNodePrunesMissingStorages {
           assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
           return true;
         }
-      }, 10, 30000);
+      }, 1000, 30000);
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -365,4 +363,60 @@ public class TestNameNodePrunesMissingStorages {
       cluster.shutdown();
     }
   }
+
+  @Test(timeout=300000)
+  public void testNameNodePrunesUnreportedStorages() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    // Create a cluster with one datanode with two storages
+    MiniDFSCluster cluster = new MiniDFSCluster
+        .Builder(conf).numDataNodes(1)
+        .storagesPerDatanode(2)
+        .build();
+    // Create two files to ensure each storage has a block
+    DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file1"),
+        102400, 102400, 102400, (short)1,
+        0x1BAD5EE);
+    DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file2"),
+        102400, 102400, 102400, (short)1,
+        0x1BAD5EED);
+    // Get the datanode storages and data directories
+    DataNode dn = cluster.getDataNodes().get(0);
+    BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
+    DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
+        getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
+    DatanodeStorageInfo[] dnStoragesInfosBeforeRestart =
+        dnDescriptor.getStorageInfos();
+    Collection<String> oldDirs =  new ArrayList<String>(dn.getConf().
+        getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+    // Keep the first data directory and remove the second.
+    String newDirs = oldDirs.iterator().next();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+    // Restart the datanode with the new conf
+    cluster.stopDataNode(0);
+    cluster.startDataNodes(conf, 1, false, null, null);
+    dn = cluster.getDataNodes().get(0);
+    cluster.waitActive();
+    // Assert that the dnDescriptor has both the storages after restart
+    assertArrayEquals(dnStoragesInfosBeforeRestart,
+        dnDescriptor.getStorageInfos());
+    // Assert that the removed storage is marked as FAILED
+    // when DN heartbeats to the NN
+    int numFailedStoragesWithBlocks = 0;
+    DatanodeStorageInfo failedStorageInfo = null;
+    for (DatanodeStorageInfo dnStorageInfo: dnDescriptor.getStorageInfos()) {
+      if (dnStorageInfo.areBlocksOnFailedStorage()) {
+        numFailedStoragesWithBlocks++;
+        failedStorageInfo = dnStorageInfo;
+      }
+    }
+    assertEquals(1, numFailedStoragesWithBlocks);
+    // Heartbeat manager removes the blocks associated with this failed storage
+    bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+    assertTrue(!failedStorageInfo.areBlocksOnFailedStorage());
+    // pruneStorageMap removes the unreported storage
+    cluster.triggerHeartbeats();
+    // Assert that the unreported storage is pruned
+    assertEquals(DataNode.getStorageLocations(dn.getConf()).size(),
+        dnDescriptor.getStorageInfos().length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2304501b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 53b9263..6810a0b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -29,7 +29,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -50,7 +55,10 @@ import 
org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -649,6 +657,48 @@ public abstract class BlockReportTestBase {
     DFSTestUtil.readFile(fs, filePath);
   }
 
+  // See HDFS-10301
+  @Test(timeout = 300000)
+  public void testInterleavedBlockReports()
+      throws IOException, ExecutionException, InterruptedException {
+    int numConcurrentBlockReports = 3;
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    final String poolId = cluster.getNamesystem().getBlockPoolId();
+    LOG.info("Block pool id: " + poolId);
+    final DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    final StorageBlockReport[] reports =
+        getBlockReports(dn, poolId, true, true);
+
+    // Get the list of storage ids associated with the datanode
+    // before the test
+    BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
+    final DatanodeDescriptor dnDescriptor =
+        bm.getDatanodeManager().getDatanode(dn.getDatanodeId());
+    DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
+
+    // Send the block report concurrently using
+    // numThreads=numConcurrentBlockReports
+    ExecutorService executorService =
+        Executors.newFixedThreadPool(numConcurrentBlockReports);
+    List<Future<Void>> futureList = new ArrayList<>(numConcurrentBlockReports);
+    for (int i = 0; i < numConcurrentBlockReports; i++) {
+      futureList.add(executorService.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          sendBlockReports(dnR, poolId, reports);
+          return null;
+        }
+      }));
+    }
+    for (Future<Void> future : futureList) {
+      future.get();
+    }
+    executorService.shutdown();
+
+    // Verify that the storages match before and after the test
+    Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
+  }
+
   private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
     final boolean tooLongWait = false;
     final int TIMEOUT = 40000;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to