This is an automated email from the ASF dual-hosted git repository.

zanderxu pushed a commit to branch HDFS-17384
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 07c4b73dc5008e4d3815a0b77e9528049b519d5c
Author: ZanderXu <zande...@apache.org>
AuthorDate: Wed Mar 27 09:45:17 2024 +0800

    HDFS-17417. [FGL] HeartbeatManager and DatanodeAdminMonitor support 
fine-grained locking (#6656)
---
 .../hdfs/server/blockmanagement/BlockManager.java  | 10 +++---
 .../DatanodeAdminBackoffMonitor.java               | 38 ++++++++++++----------
 .../DatanodeAdminDefaultMonitor.java               | 11 ++++---
 .../server/blockmanagement/DatanodeManager.java    |  5 +--
 .../server/blockmanagement/HeartbeatManager.java   |  9 ++---
 5 files changed, 41 insertions(+), 32 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 82b82433e70e..23a864f731b0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1825,7 +1825,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
   void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
     final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
@@ -4876,6 +4876,7 @@ public class BlockManager implements BlockStatsMXBean {
         NumberReplicas num = countNodes(block);
         if (shouldProcessExtraRedundancy(num, expectedReplication)) {
           // extra redundancy block
+          // Here involves storage policy ID.
           processExtraRedundancyBlock(block, (short) expectedReplication, null,
               null);
           numExtraRedundancy++;
@@ -4884,14 +4885,15 @@ public class BlockManager implements BlockStatsMXBean {
       // When called by tests like TestDefaultBlockPlacementPolicy.
       // testPlacementWithLocalRackNodesDecommissioned, it is not protected by
       // lock, only when called by DatanodeManager.refreshNodes have writeLock
-      if (namesystem.hasWriteLock()) {
-        namesystem.writeUnlock("processExtraRedundancyBlocksOnInService");
+      if (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)) {
+        namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL,
+            "processExtraRedundancyBlocksOnInService");
         try {
           Thread.sleep(1);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       }
     }
     LOG.info("Invalidated {} extra redundancy blocks on {} after "
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java
index 79d5a065b08a..d212d142d441 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.namenode.INode;
@@ -170,7 +171,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
     numBlocksChecked = 0;
     // Check decommission or maintenance progress.
     try {
-      namesystem.writeLock();
+      namesystem.writeLock(FSNamesystemLockMode.BM);
       try {
         /**
          * Other threads can modify the pendingNode list and the cancelled
@@ -208,7 +209,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
 
         processPendingNodes();
       } finally {
-        namesystem.writeUnlock("DatanodeAdminMonitorV2Thread");
+        namesystem.writeUnlock(FSNamesystemLockMode.BM, 
"DatanodeAdminMonitorV2Thread");
       }
       // After processing the above, various parts of the check() method will
       // take and drop the read / write lock as needed. Aside from the
@@ -326,7 +327,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
    */
   private void processMaintenanceNodes() {
     // Check for any maintenance state nodes which need to be expired
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       for (DatanodeDescriptor dn : outOfServiceNodeBlocks.keySet()) {
         if (dn.isMaintenance() && dn.maintenanceExpired()) {
@@ -338,12 +339,12 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
           // which added the node to the cancelled list. Therefore expired
           // maintenance nodes do not need to be added to the toRemove list.
           dnAdmin.stopMaintenance(dn);
-          namesystem.writeUnlock("processMaintenanceNodes");
-          namesystem.writeLock();
+          namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"processMaintenanceNodes");
+          namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
         }
       }
     } finally {
-      namesystem.writeUnlock("processMaintenanceNodes");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"processMaintenanceNodes");
     }
   }
 
@@ -360,7 +361,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
       // taking the write lock at all.
       return;
     }
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     try {
       for (DatanodeDescriptor dn : toRemove) {
         final boolean isHealthy =
@@ -402,7 +403,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
         }
       }
     } finally {
-      namesystem.writeUnlock("processCompletedNodes");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCompletedNodes");
     }
   }
 
@@ -486,7 +487,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
       return;
     }
 
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       long repQueueSize = blockManager.getLowRedundancyBlocksCount();
 
@@ -524,8 +525,8 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
           // replication
           if (blocksProcessed >= blocksPerLock) {
             blocksProcessed = 0;
-            namesystem.writeUnlock("moveBlocksToPending");
-            namesystem.writeLock();
+            namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"moveBlocksToPending");
+            namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
           }
           blocksProcessed++;
           if (nextBlockAddedToPending(blockIt, dn)) {
@@ -546,7 +547,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
         }
       }
     } finally {
-      namesystem.writeUnlock("moveBlocksToPending");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"moveBlocksToPending");
     }
     LOG.debug("{} blocks are now pending replication", pendingCount);
   }
@@ -626,15 +627,16 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
     }
 
     DatanodeStorageInfo[] storage;
-    namesystem.readLock();
+    namesystem.readLock(FSNamesystemLockMode.BM);
     try {
       storage = dn.getStorageInfos();
     } finally {
-      namesystem.readUnlock("scanDatanodeStorage");
+      namesystem.readUnlock(FSNamesystemLockMode.BM, "scanDatanodeStorage");
     }
 
     for (DatanodeStorageInfo s : storage) {
-      namesystem.readLock();
+      // isBlockReplicatedOk involves FS.
+      namesystem.readLock(FSNamesystemLockMode.GLOBAL);
       try {
         // As the lock is dropped and re-taken between each storage, we need
         // to check the storage is still present before processing it, as it
@@ -660,7 +662,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
           numBlocksChecked++;
         }
       } finally {
-        namesystem.readUnlock("scanDatanodeStorage");
+        namesystem.readUnlock(FSNamesystemLockMode.GLOBAL, 
"scanDatanodeStorage");
       }
     }
   }
@@ -683,7 +685,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
    * namenode write lock while it runs.
    */
   private void processPendingReplication() {
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       for (Iterator<Map.Entry<DatanodeDescriptor, List<BlockInfo>>>
            entIt = pendingRep.entrySet().iterator(); entIt.hasNext();) {
@@ -715,7 +717,7 @@ public class DatanodeAdminBackoffMonitor extends 
DatanodeAdminMonitorBase
             suspectBlocks.getOutOfServiceBlockCount());
       }
     } finally {
-      namesystem.writeUnlock("processPendingReplication");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"processPendingReplication");
     }
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java
index 94049b35dc48..cf06d53b8bae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -182,7 +183,9 @@ public class DatanodeAdminDefaultMonitor extends 
DatanodeAdminMonitorBase
     numBlocksCheckedPerLock = 0;
     numNodesChecked = 0;
     // Check decommission or maintenance progress.
-    namesystem.writeLock();
+    // dnAdmin.stopMaintenance(dn) needs FSReadLock
+    // since processExtraRedundancyBlock involves storage policy and 
isSufficient involves bc.
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       processCancelledNodes();
       processPendingNodes();
@@ -191,7 +194,7 @@ public class DatanodeAdminDefaultMonitor extends 
DatanodeAdminMonitorBase
       LOG.warn("DatanodeAdminMonitor caught exception when processing node.",
           e);
     } finally {
-      namesystem.writeUnlock("DatanodeAdminMonitorThread");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"DatanodeAdminMonitorThread");
     }
     if (numBlocksChecked + numNodesChecked > 0) {
       LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " +
@@ -426,7 +429,7 @@ public class DatanodeAdminDefaultMonitor extends 
DatanodeAdminMonitorBase
         // lock.
         // Yielding is required in case of block number is greater than the
         // configured per-iteration-limit.
-        namesystem.writeUnlock("processBlocksInternal");
+        namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"processBlocksInternal");
         try {
           LOG.debug("Yielded lock during decommission/maintenance check");
           Thread.sleep(0, 500);
@@ -435,7 +438,7 @@ public class DatanodeAdminDefaultMonitor extends 
DatanodeAdminMonitorBase
         }
         // reset
         numBlocksCheckedPerLock = 0;
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       }
       numBlocksChecked++;
       numBlocksCheckedPerLock++;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index dc22fe22c96e..392f86d79fd5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1335,12 +1335,13 @@ public class DatanodeManager {
    */
   public void refreshNodes(final Configuration conf) throws IOException {
     refreshHostsReader(conf);
-    namesystem.writeLock();
+    // processExtraRedundancyBlocksOnInService involves FS in stopMaintenance 
and stopDecommission.
+    namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
     try {
       refreshDatanodes();
       countSoftwareVersions();
     } finally {
-      namesystem.writeUnlock("refreshNodes");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "refreshNodes");
     }
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index 429d40d9fbdd..6961e9912c55 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.util.Daemon;
@@ -514,20 +515,20 @@ class HeartbeatManager implements DatanodeStatistics {
 
       for (DatanodeDescriptor dead : deadDatanodes) {
         // acquire the fsnamesystem lock, and then remove the dead node.
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.BM);
         try {
           dm.removeDeadDatanode(dead, !dead.isMaintenance());
         } finally {
-          namesystem.writeUnlock("removeDeadDatanode");
+          namesystem.writeUnlock(FSNamesystemLockMode.BM, 
"removeDeadDatanode");
         }
       }
       for (DatanodeStorageInfo failedStorage : failedStorages) {
         // acquire the fsnamesystem lock, and remove blocks on the storage.
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.BM);
         try {
           blockManager.removeBlocksAssociatedTo(failedStorage);
         } finally {
-          namesystem.writeUnlock("removeBlocksAssociatedTo");
+          namesystem.writeUnlock(FSNamesystemLockMode.BM, 
"removeBlocksAssociatedTo");
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to