Author: wang Date: Fri Jan 3 02:45:53 2014 New Revision: 1555002 URL: http://svn.apache.org/r1555002 Log: HDFS-5651. Remove dfs.namenode.caching.enabled and improve CRM locking. Contributed by Colin Patrick McCabe.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jan 3 02:45:53 2014 @@ -243,6 +243,9 @@ Trunk (Unreleased) HDFS-5636. Enforce a max TTL per cache pool. (awang via cmccabe) + HDFS-5651. Remove dfs.namenode.caching.enabled and improve CRM locking. + (cmccabe via wang) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 3 02:45:53 2014 @@ -108,8 +108,9 @@ public class DFSConfigKeys extends Commo public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0; public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume"; public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; - public static final String DFS_NAMENODE_CACHING_ENABLED_KEY = "dfs.namenode.caching.enabled"; - public static final boolean DFS_NAMENODE_CACHING_ENABLED_DEFAULT = false; + public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT = + "dfs.namenode.path.based.cache.block.map.allocation.percent"; + public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f; public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port"; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Fri Jan 3 02:45:53 2014 @@ -87,17 +87,17 @@ public class CacheReplicationMonitor ext * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and * waiting for rescan operations. */ - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock; /** * Notifies the scan thread that an immediate rescan is needed. */ - private final Condition doRescan = lock.newCondition(); + private final Condition doRescan; /** * Notifies waiting threads that a rescan has finished. */ - private final Condition scanFinished = lock.newCondition(); + private final Condition scanFinished; /** * Whether there are pending CacheManager operations that necessitate a @@ -122,11 +122,6 @@ public class CacheReplicationMonitor ext private boolean shutdown = false; /** - * The monotonic time at which the current scan started. - */ - private long startTimeMs; - - /** * Mark status of the current scan. */ private boolean mark = false; @@ -142,24 +137,27 @@ public class CacheReplicationMonitor ext private long scannedBlocks; public CacheReplicationMonitor(FSNamesystem namesystem, - CacheManager cacheManager, long intervalMs) { + CacheManager cacheManager, long intervalMs, ReentrantLock lock) { this.namesystem = namesystem; this.blockManager = namesystem.getBlockManager(); this.cacheManager = cacheManager; this.cachedBlocks = cacheManager.getCachedBlocks(); this.intervalMs = intervalMs; + this.lock = lock; + this.doRescan = this.lock.newCondition(); + this.scanFinished = this.lock.newCondition(); } @Override public void run() { - startTimeMs = 0; + long startTimeMs = 0; + Thread.currentThread().setName("CacheReplicationMonitor(" + + System.identityHashCode(this) + ")"); LOG.info("Starting CacheReplicationMonitor with interval " + intervalMs + " milliseconds"); try { long curTimeMs = Time.monotonicNow(); while (true) { - // Not all of the variables accessed here need the CRM lock, but take - // it anyway for simplicity lock.lock(); try { while (true) { @@ -180,12 +178,6 @@ public class CacheReplicationMonitor ext doRescan.await(delta, TimeUnit.MILLISECONDS); curTimeMs = Time.monotonicNow(); } - } finally { - lock.unlock(); - } - // Mark scan as started, clear needsRescan - lock.lock(); - try { isScanning = true; needsRescan = false; } finally { @@ -195,7 +187,7 @@ public class CacheReplicationMonitor ext mark = !mark; rescan(); curTimeMs = Time.monotonicNow(); - // Retake the CRM lock to update synchronization-related variables + // Update synchronization-related variables. lock.lock(); try { isScanning = false; @@ -208,6 +200,9 @@ public class CacheReplicationMonitor ext scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " + "millisecond(s)."); } + } catch (InterruptedException e) { + LOG.info("Shutting down CacheReplicationMonitor."); + return; } catch (Throwable t) { LOG.fatal("Thread exiting", t); terminate(1, t); @@ -215,26 +210,6 @@ public class CacheReplicationMonitor ext } /** - * Similar to {@link CacheReplicationMonitor#waitForRescan()}, except it only - * waits if there are pending operations that necessitate a rescan as - * indicated by {@link #setNeedsRescan()}. - * <p> - * Note that this call may release the FSN lock, so operations before and - * after are not necessarily atomic. - */ - public void waitForRescanIfNeeded() { - lock.lock(); - try { - if (!needsRescan) { - return; - } - } finally { - lock.unlock(); - } - waitForRescan(); - } - - /** * Waits for a rescan to complete. This doesn't guarantee consistency with * pending operations, only relative recency, since it will not force a new * rescan if a rescan is already underway. @@ -242,49 +217,27 @@ public class CacheReplicationMonitor ext * Note that this call will release the FSN lock, so operations before and * after are not atomic. */ - public void waitForRescan() { - // Drop the FSN lock temporarily and retake it after we finish waiting - // Need to handle both the read lock and the write lock - boolean retakeWriteLock = false; - if (namesystem.hasWriteLock()) { - namesystem.writeUnlock(); - retakeWriteLock = true; - } else if (namesystem.hasReadLock()) { - namesystem.readUnlock(); - } else { - // Expected to have at least one of the locks - Preconditions.checkState(false, - "Need to be holding either the read or write lock"); + public void waitForRescanIfNeeded() { + Preconditions.checkArgument(!namesystem.hasWriteLock(), + "Must not hold the FSN write lock when waiting for a rescan."); + Preconditions.checkArgument(lock.isHeldByCurrentThread(), + "Must hold the CRM lock when waiting for a rescan."); + if (!needsRescan) { + return; } - // try/finally for retaking FSN lock - try { - lock.lock(); - // try/finally for releasing CRM lock + // If no scan is already ongoing, mark the CRM as dirty and kick + if (!isScanning) { + doRescan.signal(); + } + // Wait until the scan finishes and the count advances + final long startCount = scanCount; + while ((!shutdown) && (startCount >= scanCount)) { try { - // If no scan is already ongoing, mark the CRM as dirty and kick - if (!isScanning) { - needsRescan = true; - doRescan.signal(); - } - // Wait until the scan finishes and the count advances - final long startCount = scanCount; - while (startCount >= scanCount) { - try { - scanFinished.await(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for CacheReplicationMonitor" - + " rescan", e); - break; - } - } - } finally { - lock.unlock(); - } - } finally { - if (retakeWriteLock) { - namesystem.writeLock(); - } else { - namesystem.readLock(); + scanFinished.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for CacheReplicationMonitor" + + " rescan", e); + break; } } } @@ -294,42 +247,43 @@ public class CacheReplicationMonitor ext * changes that require a rescan. */ public void setNeedsRescan() { - lock.lock(); - try { - this.needsRescan = true; - } finally { - lock.unlock(); - } + Preconditions.checkArgument(lock.isHeldByCurrentThread(), + "Must hold the CRM lock when setting the needsRescan bit."); + this.needsRescan = true; } /** - * Shut down and join the monitor thread. + * Shut down the monitor thread. */ @Override public void close() throws IOException { + Preconditions.checkArgument(namesystem.hasWriteLock()); lock.lock(); try { if (shutdown) return; + // Since we hold both the FSN write lock and the CRM lock here, + // we know that the CRM thread cannot be currently modifying + // the cache manager state while we're closing it. + // Since the CRM thread checks the value of 'shutdown' after waiting + // for a lock, we know that the thread will not modify the cache + // manager state after this point. shutdown = true; doRescan.signalAll(); scanFinished.signalAll(); } finally { lock.unlock(); } - try { - if (this.isAlive()) { - this.join(60000); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } } - private void rescan() { + private void rescan() throws InterruptedException { scannedDirectives = 0; scannedBlocks = 0; namesystem.writeLock(); try { + if (shutdown) { + throw new InterruptedException("CacheReplicationMonitor was " + + "shut down."); + } resetStatistics(); rescanCacheDirectives(); rescanCachedBlockMap(); @@ -609,9 +563,6 @@ public class CacheReplicationMonitor ext private void addNewPendingUncached(int neededUncached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingUncached) { - if (!cacheManager.isActive()) { - return; - } // Figure out which replicas can be uncached. LinkedList<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>(); @@ -647,9 +598,6 @@ public class CacheReplicationMonitor ext private void addNewPendingCached(int neededCached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingCached) { - if (!cacheManager.isActive()) { - return; - } // To figure out which replicas can be cached, we consult the // blocksMap. We don't want to try to cache a corrupt replica, though. BlockInfo blockInfo = blockManager. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jan 3 02:45:53 2014 @@ -1443,6 +1443,13 @@ public class DatanodeManager { return getClass().getSimpleName() + ": " + host2DatanodeMap; } + public void clearPendingCachingCommands() { + for (DatanodeDescriptor dn : datanodeMap.values()) { + dn.getPendingCached().clear(); + dn.getPendingUncached().clear(); + } + } + public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) { this.shouldSendCachingCommands = shouldSendCachingCommands; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Jan 3 02:45:53 2014 @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES; @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -84,7 +85,7 @@ import com.google.common.annotations.Vis /** * The Cache Manager handles caching on DataNodes. * - * This class is instantiated by the FSNamesystem when caching is enabled. + * This class is instantiated by the FSNamesystem. * It maintains the mapping of cached blocks to datanodes via processing * datanode cache reports. Based on these reports and addition and removal of * caching directives, we will schedule caching and uncaching work. @@ -93,6 +94,8 @@ import com.google.common.annotations.Vis public final class CacheManager { public static final Log LOG = LogFactory.getLog(CacheManager.class); + private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f; + // TODO: add pending / underCached / schedule cached blocks stats. /** @@ -148,32 +151,14 @@ public final class CacheManager { private final long scanIntervalMs; /** - * Whether caching is enabled. - * - * If caching is disabled, we will not process cache reports or store - * information about what is cached where. We also do not start the - * CacheReplicationMonitor thread. This will save resources, but provide - * less functionality. - * - * Even when caching is disabled, we still store path-based cache - * information. This information is stored in the edit log and fsimage. We - * don't want to lose it just because a configuration setting was turned off. - * However, we will not act on this information if caching is disabled. + * All cached blocks. */ - private final boolean enabled; + private final GSet<CachedBlock, CachedBlock> cachedBlocks; /** - * Whether the CacheManager is active. - * - * When the CacheManager is active, it tells the DataNodes what to cache - * and uncache. The CacheManager cannot become active if enabled = false. + * Lock which protects the CacheReplicationMonitor. */ - private boolean active = false; - - /** - * All cached blocks. - */ - private final GSet<CachedBlock, CachedBlock> cachedBlocks; + private final ReentrantLock crmLock = new ReentrantLock(); /** * The CacheReplicationMonitor. @@ -194,54 +179,51 @@ public final class CacheManager { scanIntervalMs = conf.getLong( DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT); - this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, - DFS_NAMENODE_CACHING_ENABLED_DEFAULT); - this.cachedBlocks = !enabled ? null : - new LightWeightGSet<CachedBlock, CachedBlock>( - LightWeightGSet.computeCapacity(0.25, "cachedBlocks")); + float cachedBlocksPercent = conf.getFloat( + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT, + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT); + if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) { + LOG.info("Using minimum value " + MIN_CACHED_BLOCKS_PERCENT + + " for " + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT); + cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT; + } + this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>( + LightWeightGSet.computeCapacity(cachedBlocksPercent, + "cachedBlocks")); + } - /** - * Activate the cache manager. - * - * When the cache manager is active, tell the datanodes where to cache files. - */ - public void activate() { - assert namesystem.hasWriteLock(); - if (enabled && (!active)) { - LOG.info("Activating CacheManager. " + - "Starting replication monitor thread..."); - active = true; - monitor = new CacheReplicationMonitor(namesystem, this, - scanIntervalMs); - monitor.start(); + public void startMonitorThread() { + crmLock.lock(); + try { + if (this.monitor == null) { + this.monitor = new CacheReplicationMonitor(namesystem, this, + scanIntervalMs, crmLock); + this.monitor.start(); + } + } finally { + crmLock.unlock(); } } - /** - * Deactivate the cache manager. - * - * When the cache manager is inactive, it does not tell the datanodes where to - * cache files. - */ - public void deactivate() { - assert namesystem.hasWriteLock(); - if (active) { - LOG.info("Deactivating CacheManager. " + - "stopping CacheReplicationMonitor thread..."); - active = false; - IOUtils.closeQuietly(monitor); - monitor = null; - LOG.info("CacheReplicationMonitor thread stopped and deactivated."); + public void stopMonitorThread() { + crmLock.lock(); + try { + if (this.monitor != null) { + CacheReplicationMonitor prevMonitor = this.monitor; + this.monitor = null; + IOUtils.closeQuietly(prevMonitor); + } + } finally { + crmLock.unlock(); } } - /** - * Return true only if the cache manager is active. - * Must be called under the FSN read or write lock. - */ - public boolean isActive() { - return active; + public void clearDirectiveStats() { + assert namesystem.hasWriteLock(); + for (CacheDirective directive : directivesById.values()) { + directive.resetStatistics(); + } } /** @@ -480,9 +462,7 @@ public final class CacheManager { directive.addBytesNeeded(stats.getBytesNeeded()); directive.addFilesNeeded(directive.getFilesNeeded()); - if (monitor != null) { - monitor.setNeedsRescan(); - } + setNeedsRescan(); } /** @@ -514,10 +494,6 @@ public final class CacheManager { long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs()); // Do quota validation if required if (!flags.contains(CacheFlag.FORCE)) { - // Can't kick and wait if caching is disabled - if (monitor != null) { - monitor.waitForRescan(); - } checkLimit(pool, path, replication); } // All validation passed @@ -622,9 +598,7 @@ public final class CacheManager { validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs()); // Indicate changes to the CRM - if (monitor != null) { - monitor.setNeedsRescan(); - } + setNeedsRescan(); // Validation passed removeInternal(prevEntry); @@ -659,9 +633,7 @@ public final class CacheManager { pool.getDirectiveList().remove(directive); assert directive.getPool() == null; - if (monitor != null) { - monitor.setNeedsRescan(); - } + setNeedsRescan(); } public void removeDirective(long id, FSPermissionChecker pc) @@ -694,9 +666,6 @@ public final class CacheManager { if (filter.getReplication() != null) { throw new IOException("Filtering by replication is unsupported."); } - if (monitor != null) { - monitor.waitForRescanIfNeeded(); - } ArrayList<CacheDirectiveEntry> replies = new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES); int numReplies = 0; @@ -805,9 +774,7 @@ public final class CacheManager { bld.append(prefix).append("set limit to " + info.getLimit()); prefix = "; "; // New limit changes stats, need to set needs refresh - if (monitor != null) { - monitor.setNeedsRescan(); - } + setNeedsRescan(); } if (info.getMaxRelativeExpiryMs() != null) { final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs(); @@ -853,9 +820,7 @@ public final class CacheManager { directivesById.remove(directive.getId()); iter.remove(); } - if (monitor != null) { - monitor.setNeedsRescan(); - } + setNeedsRescan(); } catch (IOException e) { LOG.info("removeCachePool of " + poolName + " failed: ", e); throw e; @@ -866,9 +831,6 @@ public final class CacheManager { public BatchedListEntries<CachePoolEntry> listCachePools(FSPermissionChecker pc, String prevKey) { assert namesystem.hasReadLock(); - if (monitor != null) { - monitor.waitForRescanIfNeeded(); - } final int NUM_PRE_ALLOCATED_ENTRIES = 16; ArrayList<CachePoolEntry> results = new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES); @@ -884,9 +846,6 @@ public final class CacheManager { } public void setCachedLocations(LocatedBlock block) { - if (!enabled) { - return; - } CachedBlock cachedBlock = new CachedBlock(block.getBlock().getBlockId(), (short)0, false); @@ -902,12 +861,6 @@ public final class CacheManager { public final void processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds) throws IOException { - if (!enabled) { - LOG.info("Ignoring cache report from " + datanodeID + - " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " + - "number of blocks: " + blockIds.size()); - return; - } namesystem.writeLock(); final long startTime = Time.monotonicNow(); final long endTime; @@ -1085,4 +1038,36 @@ public final class CacheManager { } prog.endStep(Phase.LOADING_FSIMAGE, step); } + + public void waitForRescanIfNeeded() { + crmLock.lock(); + try { + if (monitor != null) { + monitor.waitForRescanIfNeeded(); + } + } finally { + crmLock.unlock(); + } + } + + private void setNeedsRescan() { + crmLock.lock(); + try { + if (monitor != null) { + monitor.setNeedsRescan(); + } + } finally { + crmLock.unlock(); + } + } + + @VisibleForTesting + public Thread getCacheReplicationMonitor() { + crmLock.lock(); + try { + return monitor; + } finally { + crmLock.unlock(); + } + } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jan 3 02:45:53 2014 @@ -929,7 +929,6 @@ public class FSNamesystem implements Nam writeLock(); try { if (blockManager != null) blockManager.close(); - cacheManager.deactivate(); } finally { writeUnlock(); } @@ -999,7 +998,7 @@ public class FSNamesystem implements Nam editLogRollerThreshold, editLogRollerInterval)); nnEditLogRoller.start(); - cacheManager.activate(); + cacheManager.startMonitorThread(); blockManager.getDatanodeManager().setShouldSendCachingCommands(true); } finally { writeUnlock(); @@ -1050,7 +1049,9 @@ public class FSNamesystem implements Nam // so that the tailer starts from the right spot. dir.fsImage.updateLastAppliedTxIdFromWritten(); } - cacheManager.deactivate(); + cacheManager.stopMonitorThread(); + cacheManager.clearDirectiveStats(); + blockManager.getDatanodeManager().clearPendingCachingCommands(); blockManager.getDatanodeManager().setShouldSendCachingCommands(false); } finally { writeUnlock(); @@ -7064,6 +7065,9 @@ public class FSNamesystem implements Nam return (Long) cacheEntry.getPayload(); } boolean success = false; + if (!flags.contains(CacheFlag.FORCE)) { + cacheManager.waitForRescanIfNeeded(); + } writeLock(); Long result = null; try { @@ -7105,6 +7109,9 @@ public class FSNamesystem implements Nam if (cacheEntry != null && cacheEntry.isSuccess()) { return; } + if (!flags.contains(CacheFlag.FORCE)) { + cacheManager.waitForRescanIfNeeded(); + } writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -7164,6 +7171,7 @@ public class FSNamesystem implements Nam final FSPermissionChecker pc = isPermissionEnabled ? getPermissionChecker() : null; BatchedListEntries<CacheDirectiveEntry> results; + cacheManager.waitForRescanIfNeeded(); readLock(); boolean success = false; try { @@ -7287,6 +7295,7 @@ public class FSNamesystem implements Nam BatchedListEntries<CachePoolEntry> results; checkOperation(OperationCategory.READ); boolean success = false; + cacheManager.waitForRescanIfNeeded(); readLock(); try { checkOperation(OperationCategory.READ); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jan 3 02:45:53 2014 @@ -1476,13 +1476,13 @@ </property> <property> - <name>dfs.namenode.caching.enabled</name> - <value>false</value> + <name>dfs.namenode.path.based.cache.block.map.allocation.percent</name> + <value>0.25</value> <description> - Set to true to enable block caching. This flag enables the NameNode to - maintain a mapping of cached blocks to DataNodes via processing DataNode - cache reports. Based on these reports and addition and removal of caching - directives, the NameNode will schedule caching and uncaching work. + The percentage of the Java heap which we will allocate to the cached blocks + map. The cached blocks map is a hash map which uses chained hashing. + Smaller maps may be accessed more slowly if the number of cached blocks is + large; larger maps will consume more memory. </description> </property> Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm Fri Jan 3 02:45:53 2014 @@ -242,12 +242,6 @@ Centralized Cache Management in HDFS Be sure to configure the following: - * dfs.namenode.caching.enabled - - This must be set to true to enable caching. If this is false, the NameNode - will ignore cache reports, and will not ask DataNodes to cache - blocks. - * dfs.datanode.max.locked.memory The DataNode will treat this as the maximum amount of memory it can use for @@ -281,6 +275,13 @@ Centralized Cache Management in HDFS By default, this parameter is set to 10000, which is 10 seconds. + * dfs.namenode.path.based.cache.block.map.allocation.percent + + The percentage of the Java heap which we will allocate to the cached blocks + map. The cached blocks map is a hash map which uses chained hashing. + Smaller maps may be accessed more slowly if the number of cached blocks is + large; larger maps will consume more memory. The default is 0.25 percent. + ** {OS Limits} If you get the error "Cannot start datanode because the configured max Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Fri Jan 3 02:45:53 2014 @@ -109,14 +109,12 @@ public class TestFsDatasetCache { public void setUp() throws Exception { assumeTrue(!Path.WINDOWS); conf = new HdfsConfiguration(); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS, 500); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Fri Jan 3 02:45:53 2014 @@ -21,7 +21,6 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; @@ -118,7 +117,6 @@ public class TestCacheDirectives { conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); - conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true); conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000); // set low limits here for testing purposes @@ -868,55 +866,6 @@ public class TestCacheDirectives { } @Test(timeout=120000) - public void testAddingCacheDirectiveInfosWhenCachingIsDisabled() - throws Exception { - cluster.shutdown(); - HdfsConfiguration conf = createCachingConf(); - conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); - - try { - cluster.waitActive(); - DistributedFileSystem dfs = cluster.getFileSystem(); - NameNode namenode = cluster.getNameNode(); - // Create the pool - String pool = "pool1"; - namenode.getRpcServer().addCachePool(new CachePoolInfo(pool)); - // Create some test files - final int numFiles = 2; - final int numBlocksPerFile = 2; - final List<String> paths = new ArrayList<String>(numFiles); - for (int i=0; i<numFiles; i++) { - Path p = new Path("/testCachePaths-" + i); - FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile, - (int)BLOCK_SIZE); - paths.add(p.toUri().getPath()); - } - // Check the initial statistics at the namenode - waitForCachedBlocks(namenode, 0, 0, - "testAddingCacheDirectiveInfosWhenCachingIsDisabled:0"); - // Cache and check each path in sequence - int expected = 0; - for (int i=0; i<numFiles; i++) { - CacheDirectiveInfo directive = - new CacheDirectiveInfo.Builder(). - setPath(new Path(paths.get(i))). - setPool(pool). - build(); - dfs.addCacheDirective(directive); - waitForCachedBlocks(namenode, expected, 0, - "testAddingCacheDirectiveInfosWhenCachingIsDisabled:1"); - } - Thread.sleep(20000); - waitForCachedBlocks(namenode, expected, 0, - "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2"); - } finally { - cluster.shutdown(); - } - } - - @Test(timeout=120000) public void testWaitForCachedReplicasInDirectory() throws Exception { // Create the pool final String pool = "friendlyPool"; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java?rev=1555002&r1=1555001&r2=1555002&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java Fri Jan 3 02:45:53 2014 @@ -27,6 +27,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; +import java.util.LinkedList; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -59,6 +60,8 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import com.google.common.util.concurrent.Uninterruptibles; + /** * Tests state transition from active->standby, and manual failover * and failback between two namenodes. @@ -124,6 +127,17 @@ public class TestHAStateTransitions { } } + private void addCrmThreads(MiniDFSCluster cluster, + LinkedList<Thread> crmThreads) { + for (int nn = 0; nn <= 1; nn++) { + Thread thread = cluster.getNameNode(nn).getNamesystem(). + getCacheManager().getCacheReplicationMonitor(); + if (thread != null) { + crmThreads.add(thread); + } + } + } + /** * Test that transitioning a service to the state that it is already * in is a nop, specifically, an exception is not thrown. @@ -131,19 +145,30 @@ public class TestHAStateTransitions { @Test public void testTransitionToCurrentStateIsANop() throws Exception { Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(1) .build(); + LinkedList<Thread> crmThreads = new LinkedList<Thread>(); try { cluster.waitActive(); + addCrmThreads(cluster, crmThreads); cluster.transitionToActive(0); + addCrmThreads(cluster, crmThreads); cluster.transitionToActive(0); + addCrmThreads(cluster, crmThreads); cluster.transitionToStandby(0); + addCrmThreads(cluster, crmThreads); cluster.transitionToStandby(0); + addCrmThreads(cluster, crmThreads); } finally { cluster.shutdown(); } + // Verify that all cacheReplicationMonitor threads shut down + for (Thread thread : crmThreads) { + Uninterruptibles.joinUninterruptibly(thread); + } } /**