This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 40d54ebb66a7cd7680134d9bb80f2f0fc1bfda75 Author: ZanderXu <zande...@apache.org> AuthorDate: Wed Mar 27 09:45:17 2024 +0800 HDFS-17417. [FGL] HeartbeatManager and DatanodeAdminMonitor support fine-grained locking (#6656) --- .../hdfs/server/blockmanagement/BlockManager.java | 10 +++--- .../DatanodeAdminBackoffMonitor.java | 38 ++++++++++++---------- .../DatanodeAdminDefaultMonitor.java | 11 ++++--- .../server/blockmanagement/DatanodeManager.java | 5 +-- .../server/blockmanagement/HeartbeatManager.java | 9 ++--- 5 files changed, 41 insertions(+), 32 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 536337ade18..1e98290c78b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1830,7 +1830,7 @@ void removeBlocksAssociatedTo(final DatanodeDescriptor node) { /** Remove the blocks associated to the given DatanodeStorageInfo. */ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.BM); final Iterator<BlockInfo> it = storageInfo.getBlockIterator(); DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { @@ -4883,6 +4883,7 @@ void processExtraRedundancyBlocksOnInService( NumberReplicas num = countNodes(block); if (shouldProcessExtraRedundancy(num, expectedReplication)) { // extra redundancy block + // Here involves storage policy ID. processExtraRedundancyBlock(block, (short) expectedReplication, null, null); numExtraRedundancy++; @@ -4891,14 +4892,15 @@ void processExtraRedundancyBlocksOnInService( // When called by tests like TestDefaultBlockPlacementPolicy. // testPlacementWithLocalRackNodesDecommissioned, it is not protected by // lock, only when called by DatanodeManager.refreshNodes have writeLock - if (namesystem.hasWriteLock()) { - namesystem.writeUnlock("processExtraRedundancyBlocksOnInService"); + if (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)) { + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, + "processExtraRedundancyBlocksOnInService"); try { Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } } LOG.info("Invalidated {} extra redundancy blocks on {} after " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java index cbcf925065d..db60c7baccb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.INode; @@ -170,7 +171,7 @@ public void run() { numBlocksChecked = 0; // Check decommission or maintenance progress. try { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { /** * Other threads can modify the pendingNode list and the cancelled @@ -208,7 +209,7 @@ public void run() { processPendingNodes(); } finally { - namesystem.writeUnlock("DatanodeAdminMonitorV2Thread"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "DatanodeAdminMonitorV2Thread"); } // After processing the above, various parts of the check() method will // take and drop the read / write lock as needed. Aside from the @@ -326,7 +327,7 @@ private void check() { */ private void processMaintenanceNodes() { // Check for any maintenance state nodes which need to be expired - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { for (DatanodeDescriptor dn : outOfServiceNodeBlocks.keySet()) { if (dn.isMaintenance() && dn.maintenanceExpired()) { @@ -338,12 +339,12 @@ private void processMaintenanceNodes() { // which added the node to the cancelled list. Therefore expired // maintenance nodes do not need to be added to the toRemove list. dnAdmin.stopMaintenance(dn); - namesystem.writeUnlock("processMaintenanceNodes"); - namesystem.writeLock(); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMaintenanceNodes"); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } } } finally { - namesystem.writeUnlock("processMaintenanceNodes"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMaintenanceNodes"); } } @@ -360,7 +361,7 @@ private void processCompletedNodes(List<DatanodeDescriptor> toRemove) { // taking the write lock at all. return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { for (DatanodeDescriptor dn : toRemove) { final boolean isHealthy = @@ -402,7 +403,7 @@ private void processCompletedNodes(List<DatanodeDescriptor> toRemove) { } } } finally { - namesystem.writeUnlock("processCompletedNodes"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCompletedNodes"); } } @@ -486,7 +487,7 @@ private void moveBlocksToPending() { return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { long repQueueSize = blockManager.getLowRedundancyBlocksCount(); @@ -524,8 +525,8 @@ private void moveBlocksToPending() { // replication if (blocksProcessed >= blocksPerLock) { blocksProcessed = 0; - namesystem.writeUnlock("moveBlocksToPending"); - namesystem.writeLock(); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "moveBlocksToPending"); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } blocksProcessed++; if (nextBlockAddedToPending(blockIt, dn)) { @@ -546,7 +547,7 @@ private void moveBlocksToPending() { } } } finally { - namesystem.writeUnlock("moveBlocksToPending"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "moveBlocksToPending"); } LOG.debug("{} blocks are now pending replication", pendingCount); } @@ -626,15 +627,16 @@ private void scanDatanodeStorage(DatanodeDescriptor dn, } DatanodeStorageInfo[] storage; - namesystem.readLock(); + namesystem.readLock(FSNamesystemLockMode.BM); try { storage = dn.getStorageInfos(); } finally { - namesystem.readUnlock("scanDatanodeStorage"); + namesystem.readUnlock(FSNamesystemLockMode.BM, "scanDatanodeStorage"); } for (DatanodeStorageInfo s : storage) { - namesystem.readLock(); + // isBlockReplicatedOk involves FS. + namesystem.readLock(FSNamesystemLockMode.GLOBAL); try { // As the lock is dropped and re-taken between each storage, we need // to check the storage is still present before processing it, as it @@ -660,7 +662,7 @@ private void scanDatanodeStorage(DatanodeDescriptor dn, numBlocksChecked++; } } finally { - namesystem.readUnlock("scanDatanodeStorage"); + namesystem.readUnlock(FSNamesystemLockMode.GLOBAL, "scanDatanodeStorage"); } } } @@ -683,7 +685,7 @@ private void scanDatanodeStorage(DatanodeDescriptor dn, * namenode write lock while it runs. */ private void processPendingReplication() { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { for (Iterator<Map.Entry<DatanodeDescriptor, List<BlockInfo>>> entIt = pendingRep.entrySet().iterator(); entIt.hasNext();) { @@ -715,7 +717,7 @@ private void processPendingReplication() { suspectBlocks.getOutOfServiceBlockCount()); } } finally { - namesystem.writeUnlock("processPendingReplication"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processPendingReplication"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java index 280d3c77a3c..a9746471fdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -182,7 +183,9 @@ public void run() { numBlocksCheckedPerLock = 0; numNodesChecked = 0; // Check decommission or maintenance progress. - namesystem.writeLock(); + // dnAdmin.stopMaintenance(dn) needs FSReadLock + // since processExtraRedundancyBlock involves storage policy and isSufficient involves bc. + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { processCancelledNodes(); processPendingNodes(); @@ -191,7 +194,7 @@ public void run() { LOG.warn("DatanodeAdminMonitor caught exception when processing node.", e); } finally { - namesystem.writeUnlock("DatanodeAdminMonitorThread"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "DatanodeAdminMonitorThread"); } if (numBlocksChecked + numNodesChecked > 0) { LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " + @@ -426,7 +429,7 @@ private void processBlocksInternal( // lock. // Yielding is required in case of block number is greater than the // configured per-iteration-limit. - namesystem.writeUnlock("processBlocksInternal"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processBlocksInternal"); try { LOG.debug("Yielded lock during decommission/maintenance check"); Thread.sleep(0, 500); @@ -435,7 +438,7 @@ private void processBlocksInternal( } // reset numBlocksCheckedPerLock = 0; - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } numBlocksChecked++; numBlocksCheckedPerLock++; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index a80d255f29a..8ef6f26cc76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1343,12 +1343,13 @@ nodes with its data cleared (or user can just remove the StorageID */ public void refreshNodes(final Configuration conf) throws IOException { refreshHostsReader(conf); - namesystem.writeLock(); + // processExtraRedundancyBlocksOnInService involves FS in stopMaintenance and stopDecommission. + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { refreshDatanodes(); countSoftwareVersions(); } finally { - namesystem.writeUnlock("refreshNodes"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "refreshNodes"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 429d40d9fbd..6961e9912c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.util.Daemon; @@ -514,20 +515,20 @@ void heartbeatCheck() { for (DatanodeDescriptor dead : deadDatanodes) { // acquire the fsnamesystem lock, and then remove the dead node. - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { dm.removeDeadDatanode(dead, !dead.isMaintenance()); } finally { - namesystem.writeUnlock("removeDeadDatanode"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "removeDeadDatanode"); } } for (DatanodeStorageInfo failedStorage : failedStorages) { // acquire the fsnamesystem lock, and remove blocks on the storage. - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { blockManager.removeBlocksAssociatedTo(failedStorage); } finally { - namesystem.writeUnlock("removeBlocksAssociatedTo"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "removeBlocksAssociatedTo"); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org