Author: atm Date: Wed Apr 30 17:46:15 2014 New Revision: 1591413 URL: http://svn.apache.org/r1591413 Log: HDFS-6289. HA failover can fail if there are pending DN messages for DNs which no longer exist. Contributed by Aaron T. Myers.
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1591413&r1=1591412&r2=1591413&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Apr 30 17:46:15 2014 @@ -421,6 +421,9 @@ Release 2.5.0 - UNRELEASED HDFS-6288. DFSInputStream Pread doesn't update ReadStatistics. (Juan Yu via wang) + HDFS-6289. HA failover can fail if there are pending DN messages for DNs + which no longer exist. (atm) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1591413&r1=1591412&r2=1591413&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Apr 30 17:46:15 2014 @@ -1007,6 +1007,8 @@ public class BlockManager { while(it.hasNext()) { removeStoredBlock(it.next(), node); } + // Remove all pending DN messages referencing this DN. + pendingDNMessages.removeAllMessagesForDatanode(node); node.resetBlocks(); invalidateBlocks.remove(node.getDatanodeUuid()); @@ -1081,7 +1083,8 @@ public class BlockManager { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot mark " + b - + " as corrupt because datanode " + dn + " does not exist"); + + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + + ") does not exist"); } BlockCollection bc = b.corrupted.getBlockCollection(); @@ -1981,6 +1984,9 @@ public class BlockManager { // If the block is an out-of-date generation stamp or state, // but we're the standby, we shouldn't treat it as corrupt, // but instead just queue it for later processing. + // TODO: Pretty confident this should be s/storedBlock/block below, + // since we should be postponing the info of the reported block, not + // the stored block. See HDFS-6289 for more context. queueReportedBlock(dn, storageID, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1591413&r1=1591412&r2=1591413&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Wed Apr 30 17:46:15 2014 @@ -76,6 +76,27 @@ class PendingDataNodeMessages { } } + /** + * Remove all pending DN messages which reference the given DN. + * @param dn the datanode whose messages we should remove. + */ + void removeAllMessagesForDatanode(DatanodeDescriptor dn) { + for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry : + queueByBlockId.entrySet()) { + Queue<ReportedBlockInfo> newQueue = Lists.newLinkedList(); + Queue<ReportedBlockInfo> oldQueue = entry.getValue(); + while (!oldQueue.isEmpty()) { + ReportedBlockInfo rbi = oldQueue.remove(); + if (!rbi.getNode().equals(dn)) { + newQueue.add(rbi); + } else { + count--; + } + } + queueByBlockId.put(entry.getKey(), newQueue); + } + } + void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, ReplicaState reportedState) { block = new Block(block); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java?rev=1591413&r1=1591412&r2=1591413&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java Wed Apr 30 17:46:15 2014 @@ -50,7 +50,7 @@ public class FsDatasetUtil { } /** Find the corresponding meta data file from a given block file */ - static File findMetaFile(final File blockFile) throws IOException { + public static File findMetaFile(final File blockFile) throws IOException { final String prefix = blockFile.getName() + "_"; final File parent = blockFile.getParentFile(); final File[] matches = parent.listFiles(new FilenameFilter() { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1591413&r1=1591412&r2=1591413&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Apr 30 17:46:15 2014 @@ -83,10 +83,12 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; @@ -1705,6 +1707,14 @@ public class MiniDFSCluster { LOG.warn("Corrupting the block " + blockFile); return true; } + + public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk, + long newGenStamp) throws IOException { + File blockFile = getBlockFile(dnIndex, blk); + File metaFile = FsDatasetUtil.findMetaFile(blockFile); + return metaFile.renameTo(new File(DatanodeUtil.getMetaName( + blockFile.getAbsolutePath(), newGenStamp))); + } /* * Shutdown a particular datanode Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java?rev=1591413&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java Wed Apr 30 17:46:15 2014 @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.util.ThreadUtil; +import org.junit.Test; + +public class TestPendingCorruptDnMessages { + + private static final Path filePath = new Path("/foo.txt"); + + @Test + public void testChangedStorageId() throws IOException, URISyntaxException, + InterruptedException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .build(); + + try { + cluster.transitionToActive(0); + + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + OutputStream out = fs.create(filePath); + out.write("foo bar baz".getBytes()); + out.close(); + + HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), + cluster.getNameNode(1)); + + // Change the gen stamp of the block on datanode to go back in time (gen + // stamps start at 1000) + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + assertTrue(MiniDFSCluster.changeGenStampOfBlock(0, block, 900)); + + // Stop the DN so the replica with the changed gen stamp will be reported + // when this DN starts up. + DataNodeProperties dnProps = cluster.stopDataNode(0); + + // Restart the namenode so that when the DN comes up it will see an initial + // block report. + cluster.restartNameNode(1, false); + assertTrue(cluster.restartDataNode(dnProps, true)); + + // Wait until the standby NN queues up the corrupt block in the pending DN + // message queue. + while (cluster.getNamesystem(1).getBlockManager() + .getPendingDataNodeMessageCount() < 1) { + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + } + + assertEquals(1, cluster.getNamesystem(1).getBlockManager() + .getPendingDataNodeMessageCount()); + String oldStorageId = getRegisteredDatanodeUid(cluster, 1); + + // Reformat/restart the DN. + assertTrue(wipeAndRestartDn(cluster, 0)); + + // Give the DN time to start up and register, which will cause the + // DatanodeManager to dissociate the old storage ID from the DN xfer addr. + String newStorageId = ""; + do { + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + newStorageId = getRegisteredDatanodeUid(cluster, 1); + System.out.println("====> oldStorageId: " + oldStorageId + + " newStorageId: " + newStorageId); + } while (newStorageId.equals(oldStorageId)); + + assertEquals(0, cluster.getNamesystem(1).getBlockManager() + .getPendingDataNodeMessageCount()); + + // Now try to fail over. + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + } finally { + cluster.shutdown(); + } + } + + private static String getRegisteredDatanodeUid( + MiniDFSCluster cluster, int nnIndex) { + List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex) + .getBlockManager().getDatanodeManager() + .getDatanodeListForReport(DatanodeReportType.ALL); + assertEquals(1, registeredDatanodes.size()); + return registeredDatanodes.get(0).getDatanodeUuid(); + } + + private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex) + throws IOException { + // stop the DN, reformat it, then start it again with the same xfer port. + DataNodeProperties dnProps = cluster.stopDataNode(dnIndex); + cluster.formatDataNodeDirs(); + return cluster.restartDataNode(dnProps, true); + } + +}