Author: cmccabe Date: Sat Aug 24 03:41:25 2013 New Revision: 1517106 URL: http://svn.apache.org/r1517106 Log: HDFS-5050. Add DataNode support for mlock and munlock (contributed by Andrew Wang)
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Sat Aug 24 03:41:25 2013 @@ -15,6 +15,9 @@ HDFS-4949 (Unreleased) HDFS-5052. Add cacheRequest/uncacheRequest support to NameNode. (contributed by Colin Patrick McCabe) + HDFS-5050. Add DataNode support for mlock and munlock + (Andrew Wang via Colin Patrick McCabe) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Sat Aug 24 03:41:25 2013 @@ -100,6 +100,8 @@ public class DFSConfigKeys extends Commo public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false; public static final String DFS_DATANODE_MAX_LOCKED_MEMORY_KEY = "dfs.datanode.max.locked.memory"; public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0; + public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume"; + public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port"; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Sat Aug 24 03:41:25 2013 @@ -549,6 +549,14 @@ class BPOfferService { } dn.metrics.incrBlocksRemoved(toDelete.length); break; + case DatanodeProtocol.DNA_CACHE: + LOG.info("DatanodeCommand action: DNA_CACHE"); + dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks()); + break; + case DatanodeProtocol.DNA_UNCACHE: + LOG.info("DatanodeCommand action: DNA_UNCACHE"); + dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks()); + break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command // See HDFS-2987. Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Sat Aug 24 03:41:25 2013 @@ -449,6 +449,10 @@ class BPServiceActor implements Runnable long startTime = Time.monotonicNow(); if (startTime - lastCacheReport > dnConf.cacheReportInterval) { // TODO: Implement me! + String bpid = bpos.getBlockPoolId(); + BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid); + cmd = bpNamenode.cacheReport(bpRegistration, bpid, + blocks.getBlockListAsLongs()); } return cmd; } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Sat Aug 24 03:41:25 2013 @@ -160,4 +160,8 @@ public class DNConf { public long getXceiverStopTimeout() { return xceiverStopTimeout; } + + public long getMaxLockedMemory() { + return maxLockedMemory; + } } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Sat Aug 24 03:41:25 2013 @@ -269,6 +269,14 @@ public interface FsDatasetSpi<V extends */ public BlockListAsLongs getBlockReport(String bpid); + /** + * Returns the cache report - the full list of cached blocks of a + * block pool + * @param bpid Block Pool Id + * @return - the cache report - the full list of cached blocks + */ + public BlockListAsLongs getCacheReport(String bpid); + /** Does the dataset contain the block? */ public boolean contains(ExtendedBlock block); @@ -294,6 +302,20 @@ public interface FsDatasetSpi<V extends */ public void invalidate(String bpid, Block invalidBlks[]) throws IOException; + /** + * Caches the specified blocks + * @param bpid Block pool id + * @param cacheBlks - block to cache + */ + public void cache(String bpid, Block[] cacheBlks); + + /** + * Uncaches the specified blocks + * @param bpid Block pool id + * @param uncacheBlks - blocks to uncache + */ + public void uncache(String bpid, Block[] uncacheBlks); + /** * Check if all the data directories are healthy * @throws DiskErrorException Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1517106&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Sat Aug 24 03:41:25 2013 @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2) + * system calls to lock blocks into memory. Block checksums are verified upon + * entry into the cache. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class FsDatasetCache { + + private static final Log LOG = LogFactory.getLog(FsDatasetCache.class); + + /** + * Map of cached blocks + */ + private final ConcurrentMap<Long, MappableBlock> cachedBlocks; + + private final FsDatasetImpl dataset; + /** + * Number of cached bytes + */ + private AtomicLong usedBytes; + /** + * Total cache capacity in bytes + */ + private final long maxBytes; + + public FsDatasetCache(FsDatasetImpl dataset) { + this.dataset = dataset; + this.cachedBlocks = new ConcurrentHashMap<Long, MappableBlock>(); + this.usedBytes = new AtomicLong(0); + this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); + } + + /** + * @return if the block is cached + */ + boolean isCached(String bpid, Block block) { + MappableBlock mapBlock = cachedBlocks.get(block.getBlockId()); + if (mapBlock != null) { + return mapBlock.getBlockPoolId().equals(bpid); + } + return false; + } + + /** + * @return List of cached blocks suitable for translation into a + * {@link BlockListAsLongs} for a cache report. + */ + List<Block> getCachedBlocks(String bpid) { + List<Block> blocks = new ArrayList<Block>(); + MappableBlock mapBlock = null; + // ConcurrentHashMap iteration doesn't see latest updates, which is okay + for (Iterator<MappableBlock> it = cachedBlocks.values().iterator(); + it.hasNext(); mapBlock = it.next()) { + if (mapBlock.getBlockPoolId().equals(bpid)) { + blocks.add(mapBlock.getBlock()); + } + } + return blocks; + } + + /** + * Asynchronously attempts to cache a block. This is subject to the + * configured maximum locked memory limit. + * + * @param block block to cache + * @param volume volume of the block + * @param blockIn stream of the block's data file + * @param metaIn stream of the block's meta file + */ + void cacheBlock(String bpid, Block block, FsVolumeImpl volume, + FileInputStream blockIn, FileInputStream metaIn) { + if (isCached(bpid, block)) { + return; + } + MappableBlock mapBlock = null; + try { + mapBlock = new MappableBlock(bpid, block, volume, blockIn, metaIn); + } catch (IOException e) { + LOG.warn("Failed to cache replica " + block + ": Could not instantiate" + + " MappableBlock", e); + IOUtils.closeQuietly(blockIn); + IOUtils.closeQuietly(metaIn); + return; + } + // Check if there's sufficient cache capacity + boolean success = false; + long bytes = mapBlock.getNumBytes(); + long used = usedBytes.get(); + while (used+bytes < maxBytes) { + if (usedBytes.compareAndSet(used, used+bytes)) { + success = true; + break; + } + used = usedBytes.get(); + } + if (!success) { + LOG.warn(String.format( + "Failed to cache replica %s: %s exceeded (%d + %d > %d)", + mapBlock.getBlock().toString(), + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + used, bytes, maxBytes)); + mapBlock.close(); + return; + } + // Submit it to the worker pool to be cached + volume.getExecutor().execute(new WorkerTask(mapBlock)); + } + + /** + * Uncaches a block if it is cached. + * @param block to uncache + */ + void uncacheBlock(String bpid, Block block) { + MappableBlock mapBlock = cachedBlocks.get(block.getBlockId()); + if (mapBlock != null && + mapBlock.getBlockPoolId().equals(bpid) && + mapBlock.getBlock().equals(block)) { + mapBlock.close(); + cachedBlocks.remove(mapBlock); + long bytes = mapBlock.getNumBytes(); + long used = usedBytes.get(); + while (!usedBytes.compareAndSet(used, used - bytes)) { + used = usedBytes.get(); + } + } + } + + /** + * Background worker that mmaps, mlocks, and checksums a block + */ + private class WorkerTask implements Runnable { + + private MappableBlock block; + WorkerTask(MappableBlock block) { + this.block = block; + } + + @Override + public void run() { + boolean success = false; + try { + block.map(); + block.lock(); + block.verifyChecksum(); + success = true; + } catch (ChecksumException e) { + // Exception message is bogus since this wasn't caused by a file read + LOG.warn("Failed to cache block " + block.getBlock() + ": Checksum " + + "verification failed."); + } catch (IOException e) { + LOG.warn("Failed to cache block " + block.getBlock() + ": IOException", + e); + } + // If we failed or the block became uncacheable in the meantime, + // clean up and return the reserved cache allocation + if (!success || + !dataset.validToCache(block.getBlockPoolId(), block.getBlock())) { + block.close(); + long used = usedBytes.get(); + while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) { + used = usedBytes.get(); + } + } else { + cachedBlocks.put(block.getBlock().getBlockId(), block); + } + } + } + + // Stats related methods for FsDatasetMBean + + public long getCacheUsed() { + return usedBytes.get(); + } + + public long getCacheCapacity() { + return maxBytes; + } + + public long getCacheRemaining() { + return maxBytes - usedBytes.get(); + } +} Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Sat Aug 24 03:41:25 2013 @@ -37,6 +37,7 @@ import javax.management.NotCompliantMBea import javax.management.ObjectName; import javax.management.StandardMBean; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -170,6 +171,7 @@ class FsDatasetImpl implements FsDataset final FsVolumeList volumes; final ReplicaMap volumeMap; final FsDatasetAsyncDiskService asyncDiskService; + final FsDatasetCache cacheManager; private final int validVolsRequired; // Used for synchronizing access to usage stats @@ -228,6 +230,7 @@ class FsDatasetImpl implements FsDataset roots[idx] = storage.getStorageDir(idx).getCurrentDir(); } asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots); + cacheManager = new FsDatasetCache(this); registerMBean(storage.getStorageID()); } @@ -288,6 +291,30 @@ class FsDatasetImpl implements FsDataset } /** + * Returns the total cache used by the datanode (in bytes). + */ + @Override // FSDatasetMBean + public long getCacheUsed() { + return cacheManager.getCacheUsed(); + } + + /** + * Returns the total cache capacity of the datanode (in bytes). + */ + @Override // FSDatasetMBean + public long getCacheCapacity() { + return cacheManager.getCacheCapacity(); + } + + /** + * Returns the total amount of cache remaining (in bytes). + */ + @Override // FSDatasetMBean + public long getCacheRemaining() { + return cacheManager.getCacheRemaining(); + } + + /** * Find the block's on-disk length */ @Override // FsDatasetSpi @@ -534,6 +561,8 @@ class FsDatasetImpl implements FsDataset private synchronized ReplicaBeingWritten append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { + // uncache the block + cacheManager.uncacheBlock(bpid, replicaInfo); // unlink the finalized replica replicaInfo.unlinkBlock(1); @@ -1001,6 +1030,11 @@ class FsDatasetImpl implements FsDataset } } + @Override // FsDatasetSpi + public BlockListAsLongs getCacheReport(String bpid) { + return new BlockListAsLongs(cacheManager.getCachedBlocks(bpid), null); + } + /** * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @@ -1143,6 +1177,8 @@ class FsDatasetImpl implements FsDataset volumeMap.remove(bpid, invalidBlks[i]); } + // Uncache the block synchronously + cacheManager.uncacheBlock(bpid, invalidBlks[i]); // Delete the block asynchronously to make sure we can do it fast enough asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), @@ -1153,6 +1189,78 @@ class FsDatasetImpl implements FsDataset } } + synchronized boolean validToCache(String bpid, Block blk) { + ReplicaInfo info = volumeMap.get(bpid, blk); + if (info == null) { + LOG.warn("Failed to cache replica " + blk + ": ReplicaInfo not found."); + return false; + } + FsVolumeImpl volume = (FsVolumeImpl)info.getVolume(); + if (volume == null) { + LOG.warn("Failed to cache replica " + blk + ": Volume not found."); + return false; + } + if (info.getState() != ReplicaState.FINALIZED) { + LOG.warn("Failed to cache replica " + blk + ": Replica is not" + + " finalized."); + return false; + } + return true; + } + + /** + * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. + */ + private void cacheBlock(String bpid, Block blk) { + ReplicaInfo info; + FsVolumeImpl volume; + synchronized (this) { + if (!validToCache(bpid, blk)) { + return; + } + info = volumeMap.get(bpid, blk); + volume = (FsVolumeImpl)info.getVolume(); + } + // Try to open block and meta streams + FileInputStream blockIn = null; + FileInputStream metaIn = null; + boolean success = false; + try { + ExtendedBlock extBlk = new ExtendedBlock(bpid, blk); + blockIn = (FileInputStream)getBlockInputStream(extBlk, 0); + metaIn = (FileInputStream)getMetaDataInputStream(extBlk) + .getWrappedStream(); + success = true; + } catch (ClassCastException e) { + LOG.warn("Failed to cache replica " + blk + ": Underlying blocks" + + " are not backed by files.", e); + } catch (IOException e) { + LOG.warn("Failed to cache replica " + blk + ": IOException while" + + " trying to open block or meta files.", e); + } + if (!success) { + IOUtils.closeQuietly(blockIn); + IOUtils.closeQuietly(metaIn); + return; + } + cacheManager.cacheBlock(bpid, blk, volume, blockIn, metaIn); + } + + @Override // FsDatasetSpi + public void cache(String bpid, Block[] cacheBlks) { + for (int i=0; i<cacheBlks.length; i++) { + cacheBlock(bpid, cacheBlks[i]); + } + } + + @Override // FsDatasetSpi + public void uncache(String bpid, Block[] uncacheBlks) { + for (int i=0; i<uncacheBlks.length; i++) { + Block blk = uncacheBlks[i]; + cacheManager.uncacheBlock(bpid, blk); + } + } + @Override // FsDatasetSpi public synchronized boolean contains(final ExtendedBlock block) { final long blockId = block.getLocalBlock().getBlockId(); Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Sat Aug 24 03:41:25 2013 @@ -18,11 +18,17 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -34,6 +40,8 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The underlying volume used to store replica. * @@ -48,6 +56,13 @@ class FsVolumeImpl implements FsVolumeSp private final File currentDir; // <StorageDirectory>/current private final DF usage; private final long reserved; + /** + * Per-volume worker pool that processes new blocks to cache. + * The maximum number of workers per volume is bounded (configurable via + * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource + * contention. + */ + private final ThreadPoolExecutor cacheExecutor; FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf) throws IOException { @@ -59,6 +74,20 @@ class FsVolumeImpl implements FsVolumeSp this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); + final int maxNumThreads = dataset.datanode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, + DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT + ); + ThreadFactory workerFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d") + .build(); + cacheExecutor = new ThreadPoolExecutor( + 1, maxNumThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + workerFactory); + cacheExecutor.allowCoreThreadTimeOut(true); } File getCurrentDir() { @@ -166,7 +195,11 @@ class FsVolumeImpl implements FsVolumeSp File addBlock(String bpid, Block b, File f) throws IOException { return getBlockPoolSlice(bpid).addBlock(b, f); } - + + Executor getExecutor() { + return cacheExecutor; + } + void checkDirs() throws DiskErrorException { // TODO:FEDERATION valid synchronization for(BlockPoolSlice s : bpSlices.values()) { @@ -210,6 +243,7 @@ class FsVolumeImpl implements FsVolumeSp } void shutdown() { + cacheExecutor.shutdown(); Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet(); for (Entry<String, BlockPoolSlice> entry : set) { entry.getValue().shutdown(); Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java?rev=1517106&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java Sat Aug 24 03:41:25 2013 @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.BufferedInputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Preconditions; + +/** + * Low-level wrapper for a Block and its backing files that provides mmap, + * mlock, and checksum verification operations. + * + * This could be a private class of FsDatasetCache, not meant for other users. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class MappableBlock implements Closeable { + + private final String bpid; + private final Block block; + private final FsVolumeImpl volume; + + private final FileInputStream blockIn; + private final FileInputStream metaIn; + private final FileChannel blockChannel; + private final FileChannel metaChannel; + private final long blockSize; + + private boolean isMapped; + private boolean isLocked; + private boolean isChecksummed; + + private MappedByteBuffer blockMapped = null; + + public MappableBlock(String bpid, Block blk, FsVolumeImpl volume, + FileInputStream blockIn, FileInputStream metaIn) throws IOException { + this.bpid = bpid; + this.block = blk; + this.volume = volume; + + this.blockIn = blockIn; + this.metaIn = metaIn; + this.blockChannel = blockIn.getChannel(); + this.metaChannel = metaIn.getChannel(); + this.blockSize = blockChannel.size(); + + this.isMapped = false; + this.isLocked = false; + this.isChecksummed = false; + } + + public String getBlockPoolId() { + return bpid; + } + + public Block getBlock() { + return block; + } + + public FsVolumeImpl getVolume() { + return volume; + } + + public boolean isMapped() { + return isMapped; + } + + public boolean isLocked() { + return isLocked; + } + + public boolean isChecksummed() { + return isChecksummed; + } + + /** + * Returns the number of bytes on disk for the block file + */ + public long getNumBytes() { + return blockSize; + } + + /** + * Maps the block into memory. See mmap(2). + */ + public void map() throws IOException { + if (isMapped) { + return; + } + blockMapped = blockChannel.map(MapMode.READ_ONLY, 0, blockSize); + isMapped = true; + } + + /** + * Unmaps the block from memory. See munmap(2). + */ + public void unmap() { + if (!isMapped) { + return; + } + if (blockMapped instanceof sun.nio.ch.DirectBuffer) { + sun.misc.Cleaner cleaner = + ((sun.nio.ch.DirectBuffer)blockMapped).cleaner(); + cleaner.clean(); + } + isMapped = false; + isLocked = false; + isChecksummed = false; + } + + /** + * Locks the block into memory. This prevents the block from being paged out. + * See mlock(2). + */ + public void lock() throws IOException { + Preconditions.checkArgument(isMapped, + "Block must be mapped before it can be locked!"); + if (isLocked) { + return; + } + NativeIO.POSIX.mlock(blockMapped, blockSize); + isLocked = true; + } + + /** + * Unlocks the block from memory, allowing it to be paged out. See munlock(2). + */ + public void unlock() throws IOException { + if (!isLocked || !isMapped) { + return; + } + NativeIO.POSIX.munlock(blockMapped, blockSize); + isLocked = false; + isChecksummed = false; + } + + /** + * Reads bytes into a buffer until EOF or the buffer's limit is reached + */ + private int fillBuffer(FileChannel channel, ByteBuffer buf) + throws IOException { + int bytesRead = channel.read(buf); + if (bytesRead < 0) { + //EOF + return bytesRead; + } + while (buf.remaining() > 0) { + int n = channel.read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } + + /** + * Verifies the block's checksum. This is an I/O intensive operation. + * @return if the block was successfully checksummed. + */ + public void verifyChecksum() throws IOException, ChecksumException { + Preconditions.checkArgument(isLocked && isMapped, + "Block must be mapped and locked before checksum verification!"); + // skip if checksum has already been successfully verified + if (isChecksummed) { + return; + } + // Verify the checksum from the block's meta file + // Get the DataChecksum from the meta file header + metaChannel.position(0); + BlockMetadataHeader header = + BlockMetadataHeader.readHeader(new DataInputStream( + new BufferedInputStream(metaIn, BlockMetadataHeader + .getHeaderSize()))); + DataChecksum checksum = header.getChecksum(); + final int bytesPerChecksum = checksum.getBytesPerChecksum(); + final int checksumSize = checksum.getChecksumSize(); + final int numChunks = (8*1024*1024) / bytesPerChecksum; + ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); + ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); + // Verify the checksum + int bytesVerified = 0; + while (bytesVerified < blockChannel.size()) { + Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, + "Unexpected partial chunk before EOF"); + assert bytesVerified % bytesPerChecksum == 0; + int bytesRead = fillBuffer(blockChannel, blockBuf); + if (bytesRead == -1) { + throw new IOException("Premature EOF"); + } + blockBuf.flip(); + // Number of read chunks, including partial chunk at end + int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; + checksumBuf.limit(chunks*bytesPerChecksum); + fillBuffer(metaChannel, checksumBuf); + checksumBuf.flip(); + checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(), + bytesVerified); + // Success + bytesVerified += bytesRead; + blockBuf.clear(); + checksumBuf.clear(); + } + isChecksummed = true; + // Can close the backing file since everything is safely in memory + blockChannel.close(); + } + + @Override + public void close() { + unmap(); + IOUtils.closeQuietly(blockIn); + IOUtils.closeQuietly(metaIn); + } +} Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Sat Aug 24 03:41:25 2013 @@ -77,4 +77,19 @@ public interface FSDatasetMBean { * @return The number of failed volumes in the datanode. */ public int getNumFailedVolumes(); + + /** + * Returns the total cache used by the datanode (in bytes). + */ + public long getCacheUsed(); + + /** + * Returns the total cache capacity of the datanode (in bytes). + */ + public long getCacheCapacity(); + + /** + * Returns the total amount of cache remaining (in bytes). + */ + public long getCacheRemaining(); } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Sat Aug 24 03:41:25 2013 @@ -74,6 +74,8 @@ public interface DatanodeProtocol { final static int DNA_RECOVERBLOCK = 6; // request a block recovery final static int DNA_ACCESSKEYUPDATE = 7; // update access key final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth + final static int DNA_CACHE = 9; // cache blocks + final static int DNA_UNCACHE = 10; // uncache blocks /** * Register Datanode. Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Sat Aug 24 03:41:25 2013 @@ -101,7 +101,9 @@ message BlockCommandProto { enum Action { TRANSFER = 1; // Transfer blocks to another datanode INVALIDATE = 2; // Invalidate blocks - SHUTDOWN = 3; // Shutdown the datanode + SHUTDOWN = 3; // Shutdown the datanode + CACHE = 4; // Cache blocks on the datanode + UNCACHE = 5; // Uncache blocks on the datanode } required Action action = 1; required string blockPoolId = 2; Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Sat Aug 24 03:41:25 2013 @@ -1419,4 +1419,27 @@ </description> </property> +<property> + <name>dfs.datanode.max.locked.memory</name> + <value>0</value> + <description> + The amount of memory in bytes to use for caching of block replicas in + memory on the datanode. The datanode's maximum locked memory soft ulimit + (RLIMIT_MEMLOCK) must be set to at least this value, else the datanode + will abort on startup. + + By default, this parameter set to 0, which disables in-memory caching. + </description> +</property> + +<property> + <name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name> + <value>4</value> + <description> + The maximum number of threads per volume to use for caching new data + on the datanode. These threads consume both I/O and CPU. This can affect + normal datanode operations. + </description> +</property> + </configuration> Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java Sat Aug 24 03:41:25 2013 @@ -61,4 +61,15 @@ public class LogVerificationAppender ext } return count; } + + public int countLinesWithMessage(final String text) { + int count = 0; + for (LoggingEvent e: getLog()) { + String msg = e.getRenderedMessage(); + if (msg != null && msg.contains(text)) { + count++; + } + } + return count; + } } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1517106&r1=1517105&r2=1517106&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Aug 24 03:41:25 2013 @@ -465,6 +465,11 @@ public class SimulatedFSDataset implemen return new BlockListAsLongs(blocks, null); } + @Override // FsDatasetSpi + public BlockListAsLongs getCacheReport(String bpid) { + return new BlockListAsLongs(); + } + @Override // FSDatasetMBean public long getCapacity() { return storage.getCapacity(); @@ -490,6 +495,21 @@ public class SimulatedFSDataset implemen return storage.getNumFailedVolumes(); } + @Override // FSDatasetMBean + public long getCacheUsed() { + return 0l; + } + + @Override // FSDatasetMBean + public long getCacheCapacity() { + return 0l; + } + + @Override // FSDatasetMBean + public long getCacheRemaining() { + return 0l; + } + @Override // FsDatasetSpi public synchronized long getLength(ExtendedBlock b) throws IOException { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); @@ -559,6 +579,18 @@ public class SimulatedFSDataset implemen } } + @Override // FSDatasetSpi + public void cache(String bpid, Block[] cacheBlks) { + throw new UnsupportedOperationException( + "SimulatedFSDataset does not support cache operation!"); + } + + @Override // FSDatasetSpi + public void uncache(String bpid, Block[] uncacheBlks) { + throw new UnsupportedOperationException( + "SimulatedFSDataset does not support uncache operation!"); + } + private BInfo getBInfo(final ExtendedBlock b) { final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId()); return map == null? null: map.get(b.getLocalBlock()); Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1517106&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Sat Aug 24 03:41:25 2013 @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.doReturn; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HdfsBlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFsDatasetCache { + + // Most Linux installs allow a default of 64KB locked memory + private static final long CACHE_CAPACITY = 64 * 1024; + private static final long BLOCK_SIZE = 4096; + + private static Configuration conf; + private static MiniDFSCluster cluster = null; + private static FileSystem fs; + private static NameNode nn; + private static FSImage fsImage; + private static DataNode dn; + private static FsDatasetSpi<?> fsd; + private static DatanodeProtocolClientSideTranslatorPB spyNN; + + @Before + public void setUp() throws Exception { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + CACHE_CAPACITY); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + cluster.waitActive(); + + fs = cluster.getFileSystem(); + nn = cluster.getNameNode(); + fsImage = nn.getFSImage(); + dn = cluster.getDataNodes().get(0); + fsd = dn.getFSDataset(); + + spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + private static void setHeartbeatResponse(DatanodeCommand[] cmds) + throws IOException { + HeartbeatResponse response = new HeartbeatResponse( + cmds, + new NNHAStatusHeartbeat(HAServiceState.ACTIVE, + fsImage.getLastAppliedOrWrittenTxId())); + doReturn(response).when(spyNN).sendHeartbeat( + (DatanodeRegistration) any(), + (StorageReport[]) any(), + anyInt(), anyInt(), anyInt()); + } + + private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) { + return cacheBlocks(new HdfsBlockLocation[] {loc}); + } + + private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] locs) { + return new DatanodeCommand[] { + getResponse(locs, DatanodeProtocol.DNA_CACHE) + }; + } + + private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation loc) { + return uncacheBlocks(new HdfsBlockLocation[] {loc}); + } + + private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] locs) { + return new DatanodeCommand[] { + getResponse(locs, DatanodeProtocol.DNA_UNCACHE) + }; + } + + /** + * Creates a cache or uncache DatanodeCommand from an array of locations + */ + private static DatanodeCommand getResponse(HdfsBlockLocation[] locs, + int action) { + String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId(); + Block[] blocks = new Block[locs.length]; + for (int i=0; i<locs.length; i++) { + blocks[i] = locs[i].getLocatedBlock().getBlock().getLocalBlock(); + } + return new BlockCommand(action, bpid, blocks); + } + + private static long[] getBlockSizes(HdfsBlockLocation[] locs) + throws Exception { + long[] sizes = new long[locs.length]; + for (int i=0; i<locs.length; i++) { + HdfsBlockLocation loc = locs[i]; + String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId(); + Block block = loc.getLocatedBlock().getBlock().getLocalBlock(); + ExtendedBlock extBlock = new ExtendedBlock(bpid, block); + FileChannel blockChannel = + ((FileInputStream)fsd.getBlockInputStream(extBlock, 0)).getChannel(); + sizes[i] = blockChannel.size(); + } + return sizes; + } + + /** + * Blocks until cache usage changes from the current value, then verifies + * against the expected new value. + */ + private long verifyExpectedCacheUsage(final long current, + final long expected) throws Exception { + long cacheUsed = fsd.getCacheUsed(); + while (cacheUsed == current) { + cacheUsed = fsd.getCacheUsed(); + Thread.sleep(100); + } + long cacheCapacity = fsd.getCacheCapacity(); + long cacheRemaining = fsd.getCacheRemaining(); + assertEquals("Sum of used and remaining cache does not equal total", + cacheCapacity, cacheUsed+cacheRemaining); + assertEquals("Unexpected amount of cache used", expected, cacheUsed); + return cacheUsed; + } + + @Test(timeout=60000) + public void testCacheAndUncacheBlock() throws Exception { + final int NUM_BLOCKS = 5; + + // Write a test file + final Path testFile = new Path("/testCacheBlock"); + final long testFileLen = BLOCK_SIZE*NUM_BLOCKS; + DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl); + + // Get the details of the written file + HdfsBlockLocation[] locs = + (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen); + assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length); + final long[] blockSizes = getBlockSizes(locs); + + // Check initial state + final long cacheCapacity = fsd.getCacheCapacity(); + long cacheUsed = fsd.getCacheUsed(); + long current = 0; + assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); + assertEquals("Unexpected amount of cache used", current, cacheUsed); + + // Cache each block in succession, checking each time + for (int i=0; i<NUM_BLOCKS; i++) { + setHeartbeatResponse(cacheBlock(locs[i])); + current = verifyExpectedCacheUsage(current, current + blockSizes[i]); + } + + // Uncache each block in succession, again checking each time + for (int i=0; i<NUM_BLOCKS; i++) { + setHeartbeatResponse(uncacheBlock(locs[i])); + current = verifyExpectedCacheUsage(current, current - blockSizes[i]); + } + } + + @Test(timeout=60000) + public void testFilesExceedMaxLockedMemory() throws Exception { + // Create some test files that will exceed total cache capacity + // Don't forget that meta files take up space too! + final int numFiles = 4; + final long fileSize = CACHE_CAPACITY / numFiles; + final Path[] testFiles = new Path[4]; + final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][]; + final long[] fileSizes = new long[numFiles]; + for (int i=0; i<numFiles; i++) { + testFiles[i] = new Path("/testFilesExceedMaxLockedMemory-" + i); + DFSTestUtil.createFile(fs, testFiles[i], fileSize, (short)1, 0xDFAl); + fileLocs[i] = (HdfsBlockLocation[])fs.getFileBlockLocations( + testFiles[i], 0, fileSize); + // Get the file size (sum of blocks) + long[] sizes = getBlockSizes(fileLocs[i]); + for (int j=0; j<sizes.length; j++) { + fileSizes[i] += sizes[j]; + } + } + + // Cache the first n-1 files + long current = 0; + for (int i=0; i<numFiles-1; i++) { + setHeartbeatResponse(cacheBlocks(fileLocs[i])); + current = verifyExpectedCacheUsage(current, current + fileSizes[i]); + } + final long oldCurrent = current; + + // nth file should hit a capacity exception + final LogVerificationAppender appender = new LogVerificationAppender(); + final Logger logger = Logger.getRootLogger(); + logger.addAppender(appender); + setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1])); + int lines = 0; + while (lines == 0) { + Thread.sleep(100); + lines = appender.countLinesWithMessage( + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + " exceeded"); + } + + // Uncache the cached part of the nth file + setHeartbeatResponse(uncacheBlocks(fileLocs[numFiles-1])); + while (fsd.getCacheUsed() != oldCurrent) { + Thread.sleep(100); + } + + // Uncache the n-1 files + for (int i=0; i<numFiles-1; i++) { + setHeartbeatResponse(uncacheBlocks(fileLocs[i])); + current = verifyExpectedCacheUsage(current, current - fileSizes[i]); + } + } +}