Author: cmccabe Date: Mon Mar 17 18:46:33 2014 New Revision: 1578508 URL: http://svn.apache.org/r1578508 Log: HDFS-6107. When a block cannot be cached due to limited space on the DataNode, it becomes uncacheable (cmccabe)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1578508&r1=1578507&r2=1578508&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Mar 17 18:46:33 2014 @@ -626,6 +626,9 @@ Release 2.4.0 - UNRELEASED HDFS-6094. The same block can be counted twice towards safe mode threshold. (Arpit Agarwal) + HDFS-6107. When a block can't be cached due to limited space on the + DataNode, that block becomes uncacheable (cmccabe) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1578508&r1=1578507&r2=1578508&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Mon Mar 17 18:46:33 2014 @@ -597,14 +597,12 @@ class BPOfferService { blockIdCmd.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"); dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); - dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_UNCACHE: LOG.info("DatanodeCommand action: DNA_UNCACHE for " + blockIdCmd.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"); dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); - dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1578508&r1=1578507&r2=1578508&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Mar 17 18:46:33 2014 @@ -1040,7 +1040,7 @@ public class DataNode extends Configured } } - DataNodeMetrics getMetrics() { + public DataNodeMetrics getMetrics() { return metrics; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1578508&r1=1578507&r2=1578508&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Mar 17 18:46:33 2014 @@ -337,15 +337,16 @@ public class FsDatasetCache { ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); long newUsedBytes = usedBytesCount.reserve(length); - if (newUsedBytes < 0) { - LOG.warn("Failed to cache " + key + ": could not reserve " + length + - " more bytes in the cache: " + - DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + - " of " + maxBytes + " exceeded."); - numBlocksFailedToCache.incrementAndGet(); - return; - } + boolean reservedBytes = false; try { + if (newUsedBytes < 0) { + LOG.warn("Failed to cache " + key + ": could not reserve " + length + + " more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + + " of " + maxBytes + " exceeded."); + return; + } + reservedBytes = true; try { blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0); metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk) @@ -391,10 +392,13 @@ public class FsDatasetCache { } dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key); numBlocksCached.addAndGet(1); + dataset.datanode.getMetrics().incrBlocksCached(1); success = true; } finally { if (!success) { - newUsedBytes = usedBytesCount.release(length); + if (reservedBytes) { + newUsedBytes = usedBytesCount.release(length); + } if (LOG.isDebugEnabled()) { LOG.debug("Caching of " + key + " was aborted. We are now " + "caching only " + newUsedBytes + " + bytes in total."); @@ -439,6 +443,7 @@ public class FsDatasetCache { long newUsedBytes = usedBytesCount.release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); + dataset.datanode.getMetrics().incrBlocksUncached(1); if (LOG.isDebugEnabled()) { LOG.debug("Uncaching of " + key + " completed. " + "usedBytes = " + newUsedBytes); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1578508&r1=1578507&r2=1578508&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Mon Mar 17 18:46:33 2014 @@ -1123,6 +1123,11 @@ public class DFSTestUtil { } return false; } + LOG.info("verifyExpectedCacheUsage: got " + + curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " + + curBlocks + "/" + expectedBlocks + " blocks cached. " + + "memlock limit = " + + NativeIO.POSIX.getCacheManipulator().getMemlockLimit()); return true; } }, 100, 60000); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1578508&r1=1578507&r2=1578508&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Mon Mar 17 18:46:33 2014 @@ -40,12 +40,15 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -80,6 +83,7 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import com.google.common.base.Supplier; +import com.google.common.primitives.Ints; public class TestFsDatasetCache { private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class); @@ -349,10 +353,13 @@ public class TestFsDatasetCache { fsd.getNumBlocksFailedToCache() > 0); // Uncache the n-1 files + int curCachedBlocks = 16; for (int i=0; i<numFiles-1; i++) { setHeartbeatResponse(uncacheBlocks(fileLocs[i])); - total -= rounder.round(fileSizes[i]); - DFSTestUtil.verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i), fsd); + long uncachedBytes = rounder.round(fileSizes[i]); + total -= uncachedBytes; + curCachedBlocks -= uncachedBytes / BLOCK_SIZE; + DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd); } LOG.info("finishing testFilesExceedMaxLockedMemory"); } @@ -491,4 +498,78 @@ public class TestFsDatasetCache { MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics); MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics); } + + @Test(timeout=60000) + public void testReCacheAfterUncache() throws Exception { + final int TOTAL_BLOCKS_PER_CACHE = + Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE); + BlockReaderTestUtil.enableHdfsCachingTracing(); + Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE); + + // Create a small file + final Path SMALL_FILE = new Path("/smallFile"); + DFSTestUtil.createFile(fs, SMALL_FILE, + BLOCK_SIZE, (short)1, 0xcafe); + + // Create a file that will take up the whole cache + final Path BIG_FILE = new Path("/bigFile"); + DFSTestUtil.createFile(fs, BIG_FILE, + TOTAL_BLOCKS_PER_CACHE * BLOCK_SIZE, (short)1, 0xbeef); + final DistributedFileSystem dfs = cluster.getFileSystem(); + dfs.addCachePool(new CachePoolInfo("pool")); + final long bigCacheDirectiveId = + dfs.addCacheDirective(new CacheDirectiveInfo.Builder() + .setPool("pool").setPath(BIG_FILE).setReplication((short)1).build()); + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + long blocksCached = + MetricsAsserts.getLongCounter("BlocksCached", dnMetrics); + if (blocksCached != TOTAL_BLOCKS_PER_CACHE) { + LOG.info("waiting for " + TOTAL_BLOCKS_PER_CACHE + " to " + + "be cached. Right now only " + blocksCached + " blocks are cached."); + return false; + } + LOG.info(TOTAL_BLOCKS_PER_CACHE + " blocks are now cached."); + return true; + } + }, 1000, 30000); + + // Try to cache a smaller file. It should fail. + final long shortCacheDirectiveId = + dfs.addCacheDirective(new CacheDirectiveInfo.Builder() + .setPool("pool").setPath(SMALL_FILE).setReplication((short)1).build()); + Thread.sleep(10000); + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + Assert.assertEquals(TOTAL_BLOCKS_PER_CACHE, + MetricsAsserts.getLongCounter("BlocksCached", dnMetrics)); + + // Uncache the big file and verify that the small file can now be + // cached (regression test for HDFS-XXXX) + dfs.removeCacheDirective(bigCacheDirectiveId); + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + RemoteIterator<CacheDirectiveEntry> iter; + try { + iter = dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder().build()); + CacheDirectiveEntry entry; + do { + entry = iter.next(); + } while (entry.getInfo().getId() != shortCacheDirectiveId); + if (entry.getStats().getFilesCached() != 1) { + LOG.info("waiting for directive " + shortCacheDirectiveId + + " to be cached. stats = " + entry.getStats()); + return false; + } + LOG.info("directive " + shortCacheDirectiveId + " has been cached."); + } catch (IOException e) { + Assert.fail("unexpected exception" + e.toString()); + } + return true; + } + }, 1000, 30000); + } }