Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java?rev=1523145&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java Fri Sep 13 23:27:22 2013 @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import com.google.common.base.Preconditions; + +/** + * Handles common operations of processing a block report from a datanode, + * generating a diff of updates to the BlocksMap, and then feeding the diff + * to the subclass-implemented hooks. + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +public abstract class ReportProcessor { + + static final Log blockLog = NameNode.blockStateChangeLog; + private final String className = getClass().getSimpleName(); + // Max number of blocks to log info about during a block report. + final long maxNumBlocksToLog; + + void blockLogDebug(String message) { + if (blockLog.isDebugEnabled()) { + blockLog.info("BLOCK* " + className + message); + } + } + + void blockLogInfo(String message) { + if (blockLog.isInfoEnabled()) { + blockLog.info("BLOCK* " + className + message); + } + } + + void blockLogWarn(String message) { + blockLog.warn("BLOCK* " + className + message); + } + + void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { + if (!blockLog.isInfoEnabled()) { + return; + } + StringBuilder sb = new StringBuilder(500); + sb.append("BLOCK* " + className + "#addStoredBlock: blockMap updated: ") + .append(node) + .append(" is added to "); + storedBlock.appendStringTo(sb); + sb.append(" size " ) + .append(storedBlock.getNumBytes()); + blockLog.info(sb); + } + + public ReportProcessor(Configuration conf) { + this.maxNumBlocksToLog = conf.getLong( + DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, + DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); + } + + /** + * Processes a block report from a datanode, updating the block to + * datanode mapping, adding new blocks and removing invalid ones. + * Also computes and queues new replication and invalidation work. + * @param node Datanode sending the block report + * @param report as list of longs + * @throws IOException + */ + final void processReport(final DatanodeDescriptor node, + final BlockListAsLongs report) throws IOException { + // Normal case: + // Modify the (block-->datanode) map, according to the difference + // between the old and new block report. + // + Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>(); + Collection<Block> toRemove = new LinkedList<Block>(); + Collection<Block> toInvalidate = new LinkedList<Block>(); + Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); + Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); + reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); + + // Process the blocks on each queue + for (StatefulBlockInfo b : toUC) { + addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState); + } + for (Block b : toRemove) { + removeStoredBlock(b, node); + } + int numBlocksLogged = 0; + for (BlockInfo b : toAdd) { + addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog); + numBlocksLogged++; + } + + if (numBlocksLogged > maxNumBlocksToLog) { + blockLogInfo("#processReport: logged" + + " info for " + maxNumBlocksToLog + + " of " + numBlocksLogged + " reported."); + } + for (Block b : toInvalidate) { + blockLogInfo("#processReport: " + + b + " on " + node + " size " + b.getNumBytes() + + " does not belong to any file"); + addToInvalidates(b, node); + } + for (BlockToMarkCorrupt b : toCorrupt) { + markBlockAsCorrupt(b, node); + } + } + + /** + * Compute the difference between the current state of the datanode in the + * BlocksMap and the new reported state, categorizing changes into + * different groups (e.g. new blocks to be added, blocks that were removed, + * blocks that should be invalidated, etc.). + */ + private void reportDiff(DatanodeDescriptor dn, + BlockListAsLongs newReport, + Collection<BlockInfo> toAdd, // add to DatanodeDescriptor + Collection<Block> toRemove, // remove from DatanodeDescriptor + Collection<Block> toInvalidate, // should be removed from DN + Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list + Collection<StatefulBlockInfo> toUC) { // add to under-construction list + // place a delimiter in the list which separates blocks + // that have been reported from those that have not + BlockInfo delimiter = new BlockInfo(new Block(), 1); + boolean added = addBlock(dn, delimiter); + assert added : "Delimiting block cannot be present in the node"; + int headIndex = 0; //currently the delimiter is in the head of the list + int curIndex; + + if (newReport == null) { + newReport = new BlockListAsLongs(); + } + // scan the report and process newly reported blocks + BlockReportIterator itBR = newReport.getBlockReportIterator(); + while (itBR.hasNext()) { + Block iblk = itBR.next(); + ReplicaState iState = itBR.getCurrentReplicaState(); + BlockInfo storedBlock = processReportedBlock(dn, iblk, iState, + toAdd, toInvalidate, toCorrupt, toUC); + // move block to the head of the list + if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) { + headIndex = moveBlockToHead(dn, storedBlock, curIndex, headIndex); + } + } + // collect blocks that have not been reported + // all of them are next to the delimiter + Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator( + delimiter.getNext(0), dn); + while (it.hasNext()) { + toRemove.add(it.next()); + } + removeBlock(dn, delimiter); + } + + // Operations on the blocks on a datanode + + abstract int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock, + int curIndex, int headIndex); + + abstract boolean addBlock(DatanodeDescriptor dn, BlockInfo block); + + abstract boolean removeBlock(DatanodeDescriptor dn, BlockInfo block); + + // Cache report processing + + abstract BlockInfo processReportedBlock(DatanodeDescriptor dn, Block iblk, + ReplicaState iState, Collection<BlockInfo> toAdd, + Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt, + Collection<StatefulBlockInfo> toUC); + + // Hooks for processing the cache report diff + + abstract Block addStoredBlock(final BlockInfo block, + DatanodeDescriptor node, DatanodeDescriptor delNodeHint, + boolean logEveryBlock) throws IOException; + + abstract void removeStoredBlock(Block block, DatanodeDescriptor node); + + abstract void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeInfo dn) + throws IOException; + + abstract void addToInvalidates(final Block b, final DatanodeInfo node); + + abstract void addStoredBlockUnderConstruction( + BlockInfoUnderConstruction storedBlock, DatanodeDescriptor node, + ReplicaState reportedState) throws IOException; + + /** + * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a + * list of blocks that should be considered corrupt due to a block report. + */ + static class BlockToMarkCorrupt { + /** The corrupted block in a datanode. */ + final BlockInfo corrupted; + /** The corresponding block stored in the BlockManager. */ + final BlockInfo stored; + /** The reason to mark corrupt. */ + final String reason; + + BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) { + Preconditions.checkNotNull(corrupted, "corrupted is null"); + Preconditions.checkNotNull(stored, "stored is null"); + + this.corrupted = corrupted; + this.stored = stored; + this.reason = reason; + } + + BlockToMarkCorrupt(BlockInfo stored, String reason) { + this(stored, stored, reason); + } + + BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) { + this(new BlockInfo(stored), stored, reason); + //the corrupted block in datanode has a different generation stamp + corrupted.setGenerationStamp(gs); + } + + @Override + public String toString() { + return corrupted + "(" + + (corrupted == stored? "same as stored": "stored=" + stored) + ")"; + } + } + + /** + * StatefulBlockInfo is used to build the "toUC" list, which is a list of + * updates to the information about under-construction blocks. + * Besides the block in question, it provides the ReplicaState + * reported by the datanode in the block report. + */ + static class StatefulBlockInfo { + final BlockInfoUnderConstruction storedBlock; + final ReplicaState reportedState; + + StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, + ReplicaState reportedState) { + this.storedBlock = storedBlock; + this.reportedState = reportedState; + } + } + +}
Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java?rev=1523145&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java Fri Sep 13 23:27:22 2013 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.Block; + +/** + * Subclass of InvalidateBlocks used by the CacheReplicationManager to + * track blocks on each storage that are scheduled to be uncached. + */ +@InterfaceAudience.Private +public class UncacheBlocks extends InvalidateBlocks { + + UncacheBlocks() { + } + + @Override + synchronized List<Block> invalidateWork( + final String storageId, final DatanodeDescriptor dn) { + final List<Block> toInvalidate = pollNumBlocks(storageId, Integer.MAX_VALUE); + if (toInvalidate != null) { + dn.addBlocksToBeUncached(toInvalidate); + } + return toInvalidate; + } +} Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Sep 13 23:27:22 2013 @@ -552,10 +552,12 @@ class BPOfferService { case DatanodeProtocol.DNA_CACHE: LOG.info("DatanodeCommand action: DNA_CACHE"); dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks()); + dn.metrics.incrBlocksCached(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_UNCACHE: LOG.info("DatanodeCommand action: DNA_UNCACHE"); dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks()); + dn.metrics.incrBlocksUncached(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Fri Sep 13 23:27:22 2013 @@ -449,11 +449,24 @@ class BPServiceActor implements Runnable DatanodeCommand cmd = null; long startTime = Time.monotonicNow(); if (startTime - lastCacheReport > dnConf.cacheReportInterval) { - // TODO: Implement me! + if (LOG.isDebugEnabled()) { + LOG.debug("Sending cacheReport from service actor: " + this); + } + lastCacheReport = startTime; + String bpid = bpos.getBlockPoolId(); BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid); + long createTime = Time.monotonicNow(); + cmd = bpNamenode.cacheReport(bpRegistration, bpid, blocks.getBlockListAsLongs()); + long sendTime = Time.monotonicNow(); + long createCost = createTime - startTime; + long sendCost = sendTime - createTime; + dn.getMetrics().addCacheReport(sendCost); + LOG.info("CacheReport of " + blocks.getNumberOfBlocks() + + " blocks took " + createCost + " msec to generate and " + + sendCost + " msecs for RPC and NN processing"); } return cmd; } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Sep 13 23:27:22 2013 @@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; @@ -114,9 +116,9 @@ public class DNConf { DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, - DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); - this.cacheReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, + DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT); long initBRDelay = conf.getLong( DFS_BLOCKREPORT_INITIAL_DELAY_KEY, Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Fri Sep 13 23:27:22 2013 @@ -105,10 +105,10 @@ public class FsDatasetCache { */ List<Block> getCachedBlocks(String bpid) { List<Block> blocks = new ArrayList<Block>(); - MappableBlock mapBlock = null; // ConcurrentHashMap iteration doesn't see latest updates, which is okay - for (Iterator<MappableBlock> it = cachedBlocks.values().iterator(); - it.hasNext(); mapBlock = it.next()) { + Iterator<MappableBlock> it = cachedBlocks.values().iterator(); + while (it.hasNext()) { + MappableBlock mapBlock = it.next(); if (mapBlock.getBlockPoolId().equals(bpid)) { blocks.add(mapBlock.getBlock()); } @@ -174,12 +174,15 @@ public class FsDatasetCache { mapBlock.getBlockPoolId().equals(bpid) && mapBlock.getBlock().equals(block)) { mapBlock.close(); - cachedBlocks.remove(mapBlock); + cachedBlocks.remove(block.getBlockId()); long bytes = mapBlock.getNumBytes(); long used = usedBytes.get(); while (!usedBytes.compareAndSet(used, used - bytes)) { used = usedBytes.get(); } + LOG.info("Successfully uncached block " + block); + } else { + LOG.info("Could not uncache block " + block + ": unknown block."); } } @@ -219,6 +222,7 @@ public class FsDatasetCache { used = usedBytes.get(); } } else { + LOG.info("Successfully cached block " + block.getBlock()); cachedBlocks.put(block.getBlock().getBlockId(), block); } } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Fri Sep 13 23:27:22 2013 @@ -57,6 +57,8 @@ public class DataNodeMetrics { @Metric MutableCounterLong blocksRemoved; @Metric MutableCounterLong blocksVerified; @Metric MutableCounterLong blockVerificationFailures; + @Metric MutableCounterLong blocksCached; + @Metric MutableCounterLong blocksUncached; @Metric MutableCounterLong readsFromLocalClient; @Metric MutableCounterLong readsFromRemoteClient; @Metric MutableCounterLong writesFromLocalClient; @@ -74,6 +76,7 @@ public class DataNodeMetrics { @Metric MutableRate replaceBlockOp; @Metric MutableRate heartbeats; @Metric MutableRate blockReports; + @Metric MutableRate cacheReports; @Metric MutableRate packetAckRoundTripTimeNanos; MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles; @@ -151,6 +154,10 @@ public class DataNodeMetrics { blockReports.add(latency); } + public void addCacheReport(long latency) { + cacheReports.add(latency); + } + public void incrBlocksReplicated(int delta) { blocksReplicated.incr(delta); } @@ -175,6 +182,15 @@ public class DataNodeMetrics { blocksVerified.incr(); } + + public void incrBlocksCached(int delta) { + blocksCached.incr(delta); + } + + public void incrBlocksUncached(int delta) { + blocksUncached.incr(delta); + } + public void addReadBlockOp(long latency) { readBlockOp.add(latency); } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Sep 13 23:27:22 2013 @@ -26,9 +26,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,7 +51,7 @@ import org.apache.hadoop.util.Fallible; /** * The Cache Manager handles caching on DataNodes. */ -final class CacheManager { +public final class CacheManager { public static final Log LOG = LogFactory.getLog(CacheManager.class); /** @@ -70,6 +70,12 @@ final class CacheManager { new TreeMap<PathBasedCacheDirective, PathBasedCacheEntry>(); /** + * Cache entries, sorted by path + */ + private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath = + new TreeMap<String, List<PathBasedCacheEntry>>(); + + /** * Cache pools, sorted by name. */ private final TreeMap<String, CachePool> cachePools = @@ -90,9 +96,14 @@ final class CacheManager { */ private final int maxListCacheDirectivesResponses; - CacheManager(FSDirectory dir, Configuration conf) { + final private FSNamesystem namesystem; + final private FSDirectory dir; + + CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) { // TODO: support loading and storing of the CacheManager state clear(); + this.namesystem = namesystem; + this.dir = dir; maxListCachePoolsResponses = conf.getInt( DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT); @@ -104,6 +115,7 @@ final class CacheManager { synchronized void clear() { entriesById.clear(); entriesByDirective.clear(); + entriesByPath.clear(); cachePools.clear(); nextEntryId = 1; } @@ -131,7 +143,8 @@ final class CacheManager { try { directive.validate(); } catch (IOException ioe) { - LOG.info("addDirective " + directive + ": validation failed."); + LOG.info("addDirective " + directive + ": validation failed: " + + ioe.getClass().getName() + ": " + ioe.getMessage()); return new Fallible<PathBasedCacheEntry>(ioe); } // Check if we already have this entry. @@ -152,8 +165,34 @@ final class CacheManager { } LOG.info("addDirective " + directive + ": added cache directive " + directive); + + // Success! + // First, add it to the various maps entriesByDirective.put(directive, entry); entriesById.put(entry.getEntryId(), entry); + String path = directive.getPath(); + List<PathBasedCacheEntry> entryList = entriesByPath.get(path); + if (entryList == null) { + entryList = new ArrayList<PathBasedCacheEntry>(1); + entriesByPath.put(path, entryList); + } + entryList.add(entry); + + // Next, set the path as cached in the namesystem + try { + INode node = dir.getINode(directive.getPath()); + if (node.isFile()) { + INodeFile file = node.asFile(); + // TODO: adjustable cache replication factor + namesystem.setCacheReplicationInt(directive.getPath(), + file.getBlockReplication()); + } + } catch (IOException ioe) { + LOG.info("addDirective " + directive +": failed to cache file: " + + ioe.getClass().getName() +": " + ioe.getMessage()); + return new Fallible<PathBasedCacheEntry>(ioe); + } + return new Fallible<PathBasedCacheEntry>(entry); } @@ -201,7 +240,31 @@ final class CacheManager { return new Fallible<Long>( new UnexpectedRemovePathBasedCacheEntryException(entryId)); } + // Remove the corresponding entry in entriesByPath. + String path = existing.getDirective().getPath(); + List<PathBasedCacheEntry> entries = entriesByPath.get(path); + if (entries == null || !entries.remove(existing)) { + return new Fallible<Long>( + new UnexpectedRemovePathBasedCacheEntryException(entryId)); + } + if (entries.size() == 0) { + entriesByPath.remove(path); + } entriesById.remove(entryId); + + // Set the path as uncached in the namesystem + try { + INode node = dir.getINode(existing.getDirective().getPath()); + if (node.isFile()) { + namesystem.setCacheReplicationInt(existing.getDirective().getPath(), + (short) 0); + } + } catch (IOException e) { + LOG.warn("removeEntry " + entryId + ": failure while setting cache" + + " replication factor", e); + return new Fallible<Long>(e); + } + LOG.info("removeEntry successful for PathCacheEntry id " + entryId); return new Fallible<Long>(entryId); } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Sep 13 23:27:22 2013 @@ -1092,6 +1092,52 @@ public class FSDirectory implements Clos } /** + * Set cache replication for a file + * + * @param src file name + * @param replication new replication + * @param blockRepls block replications - output parameter + * @return array of file blocks + * @throws QuotaExceededException + * @throws SnapshotAccessControlException + */ + Block[] setCacheReplication(String src, short replication, short[] blockRepls) + throws QuotaExceededException, UnresolvedLinkException, + SnapshotAccessControlException { + waitForReady(); + writeLock(); + try { + return unprotectedSetCacheReplication(src, replication, blockRepls); + } finally { + writeUnlock(); + } + } + + Block[] unprotectedSetCacheReplication(String src, short replication, + short[] blockRepls) throws QuotaExceededException, + UnresolvedLinkException, SnapshotAccessControlException { + assert hasWriteLock(); + + final INodesInPath iip = rootDir.getINodesInPath4Write(src, true); + final INode inode = iip.getLastINode(); + if (inode == null || !inode.isFile()) { + return null; + } + INodeFile file = inode.asFile(); + final short oldBR = file.getCacheReplication(); + + // TODO: Update quotas here as repl goes up or down + file.setCacheReplication(replication); + final short newBR = file.getCacheReplication(); + + if (blockRepls != null) { + blockRepls[0] = oldBR; + blockRepls[1] = newBR; + } + return file.getBlocks(); + } + + /** * @param path the file path * @return the block size of the file. */ Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 13 23:27:22 2013 @@ -367,6 +367,7 @@ public class FSNamesystem implements Nam private final BlockManager blockManager; private final SnapshotManager snapshotManager; private final CacheManager cacheManager; + private final CacheReplicationManager cacheReplicationManager; private final DatanodeStatistics datanodeStatistics; // Block pool ID used by this namenode @@ -694,7 +695,9 @@ public class FSNamesystem implements Nam this.dtSecretManager = createDelegationTokenSecretManager(conf); this.dir = new FSDirectory(fsImage, this, conf); this.snapshotManager = new SnapshotManager(dir); - this.cacheManager= new CacheManager(dir, conf); + this.cacheManager = new CacheManager(this, dir, conf); + this.cacheReplicationManager = new CacheReplicationManager(this, + blockManager, blockManager.getDatanodeManager(), this, conf); this.safeMode = new SafeModeInfo(conf); this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && @@ -871,6 +874,7 @@ public class FSNamesystem implements Nam getCompleteBlocksTotal()); setBlockTotal(); blockManager.activate(conf); + cacheReplicationManager.activate(); } finally { writeUnlock(); } @@ -887,6 +891,7 @@ public class FSNamesystem implements Nam writeLock(); try { if (blockManager != null) blockManager.close(); + if (cacheReplicationManager != null) cacheReplicationManager.close(); } finally { writeUnlock(); } @@ -917,7 +922,9 @@ public class FSNamesystem implements Nam blockManager.getDatanodeManager().markAllDatanodesStale(); blockManager.clearQueues(); blockManager.processAllPendingDNMessages(); - + + cacheReplicationManager.clearQueues(); + if (!isInSafeMode() || (isInSafeMode() && safeMode.isPopulatingReplQueues())) { LOG.info("Reprocessing replication and invalidation queues"); @@ -1910,6 +1917,42 @@ public class FSNamesystem implements Nam return isFile; } + boolean setCacheReplicationInt(String src, final short replication) + throws IOException { + final boolean isFile; + FSPermissionChecker pc = getPermissionChecker(); + checkOperation(OperationCategory.WRITE); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { + throw new SafeModeException("Cannot set replication for " + src, safeMode); + } + src = FSDirectory.resolvePath(src, pathComponents, dir); + if (isPermissionEnabled) { + checkPathAccess(pc, src, FsAction.WRITE); + } + + final short[] blockRepls = new short[2]; // 0: old, 1: new + final Block[] blocks = dir.setCacheReplication(src, replication, + blockRepls); + isFile = (blocks != null); + if (isFile) { + cacheReplicationManager.setCacheReplication(blockRepls[0], + blockRepls[1], src, blocks); + } + } finally { + writeUnlock(); + } + + getEditLog().logSync(); + if (isFile) { + logAuditEvent(true, "setReplication", src); + } + return isFile; + } + long getPreferredBlockSize(String filename) throws IOException, UnresolvedLinkException { FSPermissionChecker pc = getPermissionChecker(); @@ -6391,6 +6434,14 @@ public class FSNamesystem implements Nam public FSDirectory getFSDirectory() { return dir; } + /** @return the cache manager. */ + public CacheManager getCacheManager() { + return cacheManager; + } + /** @return the cache replication manager. */ + public CacheReplicationManager getCacheReplicationManager() { + return cacheReplicationManager; + } @Override // NameNodeMXBean public String getCorruptFiles() { @@ -6959,10 +7010,6 @@ public class FSNamesystem implements Nam return results; } - public CacheManager getCacheManager() { - return cacheManager; - } - /** * Default AuditLogger implementation; used when no access logger is * defined in the config file. It can also be explicitly listed in the Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Sep 13 23:27:22 2013 @@ -104,6 +104,8 @@ public class INodeFile extends INodeWith private BlockInfo[] blocks; + private short cacheReplication = 0; + INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime, BlockInfo[] blklist, short replication, long preferredBlockSize) { super(id, name, permissions, mtime, atime); @@ -199,6 +201,18 @@ public class INodeFile extends INodeWith return nodeToUpdate; } + @Override + public void setCacheReplication(short cacheReplication) { + Preconditions.checkArgument(cacheReplication <= getBlockReplication(), + "Cannot set cache replication higher than block replication factor"); + this.cacheReplication = cacheReplication; + } + + @Override + public short getCacheReplication() { + return cacheReplication; + } + /** @return preferred block size (in bytes) of the file. */ @Override public long getPreferredBlockSize() { Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Sep 13 23:27:22 2013 @@ -968,7 +968,14 @@ class NameNodeRpcServer implements Namen String poolId, long[] blocks) throws IOException { verifyRequest(nodeReg); BlockListAsLongs blist = new BlockListAsLongs(blocks); - namesystem.getBlockManager().processCacheReport(nodeReg, poolId, blist); + if (blockStateChangeLog.isDebugEnabled()) { + blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: " + + "from " + nodeReg + " " + blist.getNumberOfBlocks() + + " blocks"); + } + + namesystem.getCacheReplicationManager() + .processCacheReport(nodeReg, poolId, blist); return null; } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1523145&r1=1523144&r2=1523145&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Fri Sep 13 23:27:22 2013 @@ -79,6 +79,8 @@ public class NameNodeMetrics { MutableCounterLong transactionsBatchedInSync; @Metric("Block report") MutableRate blockReport; MutableQuantiles[] blockReportQuantiles; + @Metric("Cache report") MutableRate cacheReport; + MutableQuantiles[] cacheReportQuantiles; @Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime; @Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime; @@ -89,6 +91,7 @@ public class NameNodeMetrics { final int len = intervals.length; syncsQuantiles = new MutableQuantiles[len]; blockReportQuantiles = new MutableQuantiles[len]; + cacheReportQuantiles = new MutableQuantiles[len]; for (int i = 0; i < len; i++) { int interval = intervals[i]; @@ -98,6 +101,9 @@ public class NameNodeMetrics { blockReportQuantiles[i] = registry.newQuantiles( "blockReport" + interval + "s", "Block report", "ops", "latency", interval); + cacheReportQuantiles[i] = registry.newQuantiles( + "cacheReport" + interval + "s", + "Cache report", "ops", "latency", interval); } } @@ -227,6 +233,13 @@ public class NameNodeMetrics { } } + public void addCacheBlockReport(long latency) { + cacheReport.add(latency); + for (MutableQuantiles q : cacheReportQuantiles) { + q.add(latency); + } + } + public void setSafeModeTime(long elapsed) { safeModeTime.set((int) elapsed); } Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java?rev=1523145&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java Fri Sep 13 23:27:22 2013 @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.Fallible; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestCacheReplicationManager { + + // Most Linux installs allow a default of 64KB locked memory + private static final long CACHE_CAPACITY = 64 * 1024; + private static final long BLOCK_SIZE = 4096; + + private static Configuration conf; + private static MiniDFSCluster cluster = null; + private static FileSystem fs; + private static NameNode nn; + private static NamenodeProtocols nnRpc; + private static CacheReplicationManager cacheReplManager; + final private static FileSystemTestHelper helper = new FileSystemTestHelper(); + private static Path rootDir; + + @Before + public void setUp() throws Exception { + + assumeTrue(NativeIO.isAvailable()); + + conf = new HdfsConfiguration(); + conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + CACHE_CAPACITY); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true); + conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + cluster.waitActive(); + + fs = cluster.getFileSystem(); + nn = cluster.getNameNode(); + nnRpc = nn.getRpcServer(); + cacheReplManager = nn.getNamesystem().getCacheReplicationManager(); + rootDir = helper.getDefaultWorkingDirectory(fs); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + private int countNumCachedBlocks() { + return cacheReplManager.cachedBlocksMap.size(); + } + + private void waitForExpectedNumCachedBlocks(final int expected) + throws Exception { + int actual = countNumCachedBlocks(); + while (expected != actual) { + Thread.sleep(500); + actual = countNumCachedBlocks(); + } + } + + @Test(timeout=60000) + public void testCachePaths() throws Exception { + // Create the pool + final String pool = "friendlyPool"; + nnRpc.addCachePool(new CachePoolInfo("friendlyPool")); + // Create some test files + final int numFiles = 3; + final int numBlocksPerFile = 2; + final List<String> paths = new ArrayList<String>(numFiles); + for (int i=0; i<numFiles; i++) { + Path p = new Path(rootDir, "testCachePaths-" + i); + FileSystemTestHelper.createFile(fs, p, numBlocksPerFile, (int)BLOCK_SIZE); + paths.add(p.toUri().getPath()); + } + // Check the initial statistics at the namenode + int expected = 0; + waitForExpectedNumCachedBlocks(expected); + // Cache and check each path in sequence + for (int i=0; i<numFiles; i++) { + List<PathBasedCacheDirective> toAdd = + new ArrayList<PathBasedCacheDirective>(); + toAdd.add(new PathBasedCacheDirective(paths.get(i), pool)); + List<Fallible<PathBasedCacheEntry>> fallibles = + nnRpc.addPathBasedCacheDirectives(toAdd); + assertEquals("Unexpected number of fallibles", + 1, fallibles.size()); + PathBasedCacheEntry entry = fallibles.get(0).get(); + PathBasedCacheDirective directive = entry.getDirective(); + assertEquals("Directive does not match requested path", paths.get(i), + directive.getPath()); + assertEquals("Directive does not match requested pool", pool, + directive.getPool()); + expected += numBlocksPerFile; + waitForExpectedNumCachedBlocks(expected); + } + // Uncache and check each path in sequence + RemoteIterator<PathBasedCacheEntry> entries = + nnRpc.listPathBasedCacheEntries(0, null, null); + for (int i=0; i<numFiles; i++) { + PathBasedCacheEntry entry = entries.next(); + List<Long> toRemove = new ArrayList<Long>(); + toRemove.add(entry.getEntryId()); + List<Fallible<Long>> fallibles = nnRpc.removePathBasedCacheEntries(toRemove); + assertEquals("Unexpected number of fallibles", 1, fallibles.size()); + Long l = fallibles.get(0).get(); + assertEquals("Removed entryId does not match requested", + entry.getEntryId(), l.longValue()); + expected -= numBlocksPerFile; + waitForExpectedNumCachedBlocks(expected); + } + } +} Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java ------------------------------------------------------------------------------ svn:eol-style = native