This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit d39f034f982b4a1767590aecb03bd27ed8e4c34c Author: ZanderXu <zande...@apache.org> AuthorDate: Fri Mar 22 10:08:46 2024 +0800 HDFS-17413. [FGL] CacheReplicationMonitor supports fine-grained lock (#6641) --- .../blockmanagement/CacheReplicationMonitor.java | 13 +++++---- .../hadoop/hdfs/server/namenode/CacheManager.java | 33 +++++++++++----------- .../hadoop/hdfs/server/namenode/FSNamesystem.java | 32 ++++++++++----------- 3 files changed, 40 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index f9036c550e8..fad9f4248b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.util.GSet; @@ -223,7 +224,7 @@ public void run() { * after are not atomic. */ public void waitForRescanIfNeeded() { - Preconditions.checkArgument(!namesystem.hasWriteLock(), + Preconditions.checkArgument(!namesystem.hasWriteLock(FSNamesystemLockMode.FS), "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); @@ -268,7 +269,7 @@ public void setNeedsRescan() { */ @Override public void close() throws IOException { - Preconditions.checkArgument(namesystem.hasWriteLock()); + Preconditions.checkArgument(namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)); lock.lock(); try { if (shutdown) return; @@ -291,7 +292,7 @@ private void rescan() throws InterruptedException { scannedBlocks = 0; lastScanTimeMs = Time.monotonicNow(); try { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { lock.lock(); if (shutdown) { @@ -308,7 +309,7 @@ private void rescan() throws InterruptedException { rescanCachedBlockMap(); blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime(); } finally { - namesystem.writeUnlock("cacheReplicationMonitorRescan"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan"); } } @@ -325,11 +326,11 @@ private void reacquireLock(long last) { long now = Time.monotonicNow(); if (now - last > cacheManager.getMaxLockTimeMs()) { try { - namesystem.writeUnlock(); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan"); Thread.sleep(cacheManager.getSleepTimeMs()); } catch (InterruptedException e) { } finally { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 24ccf45b91d..d296ff278b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; @@ -317,7 +318,7 @@ public void stopMonitorThread() { } public void clearDirectiveStats() { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); for (CacheDirective directive : directivesById.values()) { directive.resetStatistics(); } @@ -327,7 +328,7 @@ public void clearDirectiveStats() { * @return Unmodifiable view of the collection of CachePools. */ public Collection<CachePool> getCachePools() { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); return Collections.unmodifiableCollection(cachePools.values()); } @@ -335,18 +336,18 @@ public Collection<CachePool> getCachePools() { * @return Unmodifiable view of the collection of CacheDirectives. */ public Collection<CacheDirective> getCacheDirectives() { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); return Collections.unmodifiableCollection(directivesById.values()); } @VisibleForTesting public GSet<CachedBlock, CachedBlock> getCachedBlocks() { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.BM); return cachedBlocks; } private long getNextDirectiveId() throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); if (nextDirectiveId >= Long.MAX_VALUE - 1) { throw new IOException("No more available IDs."); } @@ -574,7 +575,7 @@ CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive) public CacheDirectiveInfo addDirective( CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); CacheDirective directive; try { CachePool pool = getCachePool(validatePoolName(info)); @@ -652,7 +653,7 @@ void modifyDirectiveFromEditLog(CacheDirectiveInfo info) public void modifyDirective(CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); String idString = (info.getId() == null) ? "(null)" : info.getId().toString(); @@ -703,7 +704,7 @@ public void modifyDirective(CacheDirectiveInfo info, private void removeInternal(CacheDirective directive) throws InvalidRequestException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); // Remove the corresponding entry in directivesByPath. String path = directive.getPath(); if (!directivesByPath.remove(path, directive)) { @@ -724,7 +725,7 @@ private void removeInternal(CacheDirective directive) public void removeDirective(long id, FSPermissionChecker pc) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); try { CacheDirective directive = getById(id); checkWritePermission(pc, directive.getPool()); @@ -740,7 +741,7 @@ public void removeDirective(long id, FSPermissionChecker pc) listCacheDirectives(long prevId, CacheDirectiveInfo filter, FSPermissionChecker pc) throws IOException { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); final int NUM_PRE_ALLOCATED_ENTRIES = 16; String filterPath = null; if (filter.getPath() != null) { @@ -815,7 +816,7 @@ public void removeDirective(long id, FSPermissionChecker pc) */ public CachePoolInfo addCachePool(CachePoolInfo info) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); CachePool pool; try { CachePoolInfo.validate(info); @@ -845,7 +846,7 @@ public CachePoolInfo addCachePool(CachePoolInfo info) */ public void modifyCachePool(CachePoolInfo info) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); StringBuilder bld = new StringBuilder(); try { CachePoolInfo.validate(info); @@ -915,7 +916,7 @@ public void modifyCachePool(CachePoolInfo info) */ public void removeCachePool(String poolName) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); try { CachePoolInfo.validateName(poolName); CachePool pool = cachePools.remove(poolName); @@ -941,7 +942,7 @@ public void removeCachePool(String poolName) public BatchedListEntries<CachePoolEntry> listCachePools(FSPermissionChecker pc, String prevKey) { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); final int NUM_PRE_ALLOCATED_ENTRIES = 16; ArrayList<CachePoolEntry> results = new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES); @@ -1008,7 +1009,7 @@ public final void processCacheReport(final DatanodeID datanodeID, datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size()); return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); final long startTime = Time.monotonicNow(); final long endTime; try { @@ -1022,7 +1023,7 @@ public final void processCacheReport(final DatanodeID datanodeID, processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); - namesystem.writeUnlock("processCacheReport"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCacheReport"); } // Log the block report processing stats from Namenode perspective diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 785c4500e1d..724ba2769e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7802,7 +7802,7 @@ long addCacheDirective(CacheDirectiveInfo directive, checkOperation(OperationCategory.WRITE); FSPermissionChecker.setOperationType(operationName); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot add cache directive"); @@ -7811,7 +7811,7 @@ long addCacheDirective(CacheDirectiveInfo directive, } finally { effectiveDirectiveStr = effectiveDirective != null ? effectiveDirective.toString() : null; - writeUnlock(operationName, + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(effectiveDirectiveStr)); } } catch (AccessControlException ace) { @@ -7835,14 +7835,14 @@ void modifyCacheDirective(CacheDirectiveInfo directive, FSPermissionChecker.setOperationType(operationName); checkOperation(OperationCategory.WRITE); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot add cache directive"); FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags, logRetryCache); } finally { - writeUnlock(operationName, + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr, directive.toString())); } } catch (AccessControlException ace) { @@ -7861,14 +7861,14 @@ void removeCacheDirective(long id, boolean logRetryCache) throws IOException { checkOperation(OperationCategory.WRITE); FSPermissionChecker.setOperationType(operationName); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot remove cache directives"); FSNDNCacheOp.removeCacheDirective(this, cacheManager, id, logRetryCache); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(idStr)); + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, idStr, null, null); @@ -7886,13 +7886,13 @@ BatchedListEntries<CacheDirectiveEntry> listCacheDirectives( BatchedListEntries<CacheDirectiveEntry> results; cacheManager.waitForRescanIfNeeded(); try { - readLock(); + readLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.READ); results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId, filter); } finally { - readUnlock(operationName, + readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(filter.toString())); } } catch (AccessControlException ace) { @@ -7911,7 +7911,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache) String poolName = req == null ? null : req.getPoolName(); checkSuperuserPrivilege(operationName, poolName); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot add cache pool" @@ -7920,7 +7920,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache) logRetryCache); poolInfoStr = info.toString(); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr)); + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolInfoStr)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, poolInfoStr); @@ -7938,14 +7938,14 @@ void modifyCachePool(CachePoolInfo req, boolean logRetryCache) (req == null ? null : req.getPoolName()) + "}"; checkSuperuserPrivilege(operationName, poolNameStr); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot modify cache pool" + (req == null ? null : req.getPoolName())); FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr, + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr, req == null ? null : req.toString())); } } catch (AccessControlException ace) { @@ -7965,14 +7965,14 @@ void removeCachePool(String cachePoolName, boolean logRetryCache) String poolNameStr = "{poolName: " + cachePoolName + "}"; checkSuperuserPrivilege(operationName, poolNameStr); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName); FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName, logRetryCache); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr)); + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, poolNameStr); @@ -7990,12 +7990,12 @@ BatchedListEntries<CachePoolEntry> listCachePools(String prevKey) FSPermissionChecker.setOperationType(operationName); cacheManager.waitForRescanIfNeeded(); try { - readLock(); + readLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.READ); results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey); } finally { - readUnlock(operationName, getLockReportInfoSupplier(null)); + readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(null)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, null); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org