This is an automated email from the ASF dual-hosted git repository. ferhui pushed a commit to branch HDFS-17384 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-17384 by this push: new 77d32aa136e2 HDFS-17416. [FGL] Monitor threads in BlockManager.class support fine-grained lock (#6647) 77d32aa136e2 is described below commit 77d32aa136e2628bb454829fadb19c2fcf4f9ed1 Author: ZanderXu <zande...@apache.org> AuthorDate: Tue Mar 26 10:50:20 2024 +0800 HDFS-17416. [FGL] Monitor threads in BlockManager.class support fine-grained lock (#6647) --- .../hdfs/server/blockmanagement/BlockManager.java | 76 +++++++++++++--------- .../server/blockmanagement/ProvidedStorageMap.java | 2 +- .../hadoop/hdfs/server/namenode/FSNamesystem.java | 9 ++- .../server/blockmanagement/TestBlockManager.java | 2 +- .../blockmanagement/TestProvidedStorageMap.java | 3 +- .../blockmanagement/TestReplicationPolicy.java | 7 ++ 6 files changed, 64 insertions(+), 35 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 48e102969e74..39a85a6690e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2102,7 +2102,9 @@ public class BlockManager implements BlockStatsMXBean { */ int computeBlockReconstructionWork(int blocksToProcess) { List<List<BlockInfo>> blocksToReconstruct = null; - namesystem.writeLock(); + // TODO: Change it to readLock(FSNamesystemLockMode.BM) + // since chooseLowRedundancyBlocks is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { boolean reset = false; if (replQueueResetToHeadThreshold > 0) { @@ -2117,7 +2119,7 @@ public class BlockManager implements BlockStatsMXBean { blocksToReconstruct = neededReconstruction .chooseLowRedundancyBlocks(blocksToProcess, reset); } finally { - namesystem.writeUnlock("computeBlockReconstructionWork"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeBlockReconstructionWork"); } return computeReconstructionWorkForBlocks(blocksToReconstruct); } @@ -2136,7 +2138,9 @@ public class BlockManager implements BlockStatsMXBean { List<BlockReconstructionWork> reconWork = new ArrayList<>(); // Step 1: categorize at-risk blocks into replication and EC tasks - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesystemLockMode.GLOBAL) + // since neededReconstruction is thread safe. + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { synchronized (neededReconstruction) { for (int priority = 0; priority < blocksToReconstruct @@ -2151,7 +2155,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock("computeReconstructionWorkForBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "computeReconstructionWorkForBlocks"); } // Step 2: choose target nodes for each reconstruction task @@ -2176,7 +2180,9 @@ public class BlockManager implements BlockStatsMXBean { } // Step 3: add tasks to the DN - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesystemLockMode.BM) + // since pendingReconstruction and neededReconstruction are thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { for (BlockReconstructionWork rw : reconWork) { final DatanodeStorageInfo[] targets = rw.getTargets(); @@ -2192,7 +2198,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock("computeReconstructionWorkForBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeReconstructionWorkForBlocks"); } if (blockLog.isDebugEnabled()) { @@ -2681,7 +2687,9 @@ public class BlockManager implements BlockStatsMXBean { void processPendingReconstructions() { BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks(); if (timedOutItems != null) { - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesystemLockMode.BM) + // since neededReconstruction is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { for (int i = 0; i < timedOutItems.length; i++) { /* @@ -2700,7 +2708,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock("processPendingReconstructions"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processPendingReconstructions"); } /* If we know the target datanodes where the replication timedout, * we could invoke decBlocksScheduled() on it. Its ok for now. @@ -2895,7 +2903,7 @@ public class BlockManager implements BlockStatsMXBean { final DatanodeStorage storage, final BlockListAsLongs newReport, BlockReportContext context) throws IOException { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; DatanodeDescriptor node; @@ -2953,7 +2961,7 @@ public class BlockManager implements BlockStatsMXBean { storageInfo.receivedBlockReport(); } finally { endTime = Time.monotonicNow(); - namesystem.writeUnlock("processReport"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processReport"); } if (blockLog.isDebugEnabled()) { @@ -3026,7 +3034,7 @@ public class BlockManager implements BlockStatsMXBean { if (getPostponedMisreplicatedBlocksCount() == 0) { return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); long startTime = Time.monotonicNow(); long startSize = postponedMisreplicatedBlocks.size(); try { @@ -3055,7 +3063,8 @@ public class BlockManager implements BlockStatsMXBean { postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks); rescannedMisreplicatedBlocks.clear(); long endSize = postponedMisreplicatedBlocks.size(); - namesystem.writeUnlock("rescanPostponedMisreplicatedBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, + "rescanPostponedMisreplicatedBlocks"); LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" + " msecs. {} blocks are left. {} blocks were removed.", (Time.monotonicNow() - startTime), endSize, (startSize - endSize)); @@ -3097,7 +3106,8 @@ public class BlockManager implements BlockStatsMXBean { if (excessRedundancyMap.size() == 0) { return; } - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesysteLockMode.BM) since invalidateBlocks is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); long now = Time.monotonicNow(); int processed = 0; try { @@ -3151,7 +3161,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock("processTimedOutExcessBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processTimedOutExcessBlocks"); LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now)); } } @@ -3257,7 +3267,7 @@ public class BlockManager implements BlockStatsMXBean { final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; - assert (namesystem.hasWriteLock()); + assert (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)); assert (storageInfo.getBlockReportCount() == 0); for (BlockReportReplica iblk : report) { @@ -3308,6 +3318,7 @@ public class BlockManager implements BlockStatsMXBean { // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 + // isInSnapshot involves the full path, so it needs FSReadLock. if (namesystem.isInSnapshot(storedBlock.getBlockCollectionId())) { int numOfReplicas = storedBlock.getUnderConstructionFeature() .getNumExpectedLocations(); @@ -3713,7 +3724,7 @@ public class BlockManager implements BlockStatsMXBean { private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, DatanodeStorageInfo storageInfo) throws IOException { - assert (storedBlock != null && namesystem.hasWriteLock()); + assert (storedBlock != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)); if (!namesystem.isInStartupSafeMode() || isPopulatingReplQueues()) { addStoredBlock(storedBlock, reported, storageInfo, null, false); @@ -3748,7 +3759,7 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { - assert block != null && namesystem.hasWriteLock(); + assert block != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (!block.isComplete()) { @@ -3984,7 +3995,7 @@ public class BlockManager implements BlockStatsMXBean { while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) { int processed = 0; - namesystem.writeLockInterruptibly(); + namesystem.writeLockInterruptibly(FSNamesystemLockMode.GLOBAL); try { while (processed < numBlocksPerIteration && blocksItr.hasNext()) { BlockInfo block = blocksItr.next(); @@ -4043,7 +4054,7 @@ public class BlockManager implements BlockStatsMXBean { break; } } finally { - namesystem.writeUnlock("processMisReplicatesAsync"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMisReplicatesAsync"); LOG.info("Reconstruction queues initialisation progress: {}, total number of blocks " + "processed: {}/{}", reconstructionQueuesInitProgress, totalProcessed, totalBlocks); // Make sure it is out of the write lock for sufficiently long time. @@ -4196,7 +4207,7 @@ public class BlockManager implements BlockStatsMXBean { private boolean processExtraRedundancyBlockWithoutPostpone(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); if (addedNode == delNodeHint) { delNodeHint = null; } @@ -4240,7 +4251,10 @@ public class BlockManager implements BlockStatsMXBean { BlockInfo storedBlock, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { - assert namesystem.hasWriteLock(); + // bc.getStoragePolicyID() needs FSReadLock. + // TODO: Change to hasReadLock(FSNamesystemLockMode.GLOBAL) + // since chooseExcessRedundancyContiguous is thread safe. + assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(storedBlock); if (storedBlock.isStriped()) { @@ -4608,7 +4622,7 @@ public class BlockManager implements BlockStatsMXBean { */ public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isRegistered()) { blockLog.warn("BLOCK* processIncrementalBlockReport" @@ -4972,6 +4986,8 @@ public class BlockManager implements BlockStatsMXBean { /** updates a block in needed reconstruction queue. */ private void updateNeededReconstructions(final BlockInfo block, final int curReplicasDelta, int expectedReplicasDelta) { + // TODO: Change to readLock(FSNamesystemLockMode.BM) + // since pendingReconstruction and neededReconstruction are thread safe. namesystem.writeLock(FSNamesystemLockMode.BM); try { if (!isPopulatingReplQueues() || !block.isComplete()) { @@ -5023,8 +5039,9 @@ public class BlockManager implements BlockStatsMXBean { */ private int invalidateWorkForOneNode(DatanodeInfo dn) { final List<Block> toInvalidate; - - namesystem.writeLock(); + + // TODO: Change to readLock(FSNamesystemLockMode.BM) since invalidateBlocks is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { // blocks should not be replicated or removed if safe mode is on if (namesystem.isInSafeMode()) { @@ -5048,7 +5065,7 @@ public class BlockManager implements BlockStatsMXBean { return 0; } } finally { - namesystem.writeUnlock("invalidateWorkForOneNode"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "invalidateWorkForOneNode"); } if (blockLog.isDebugEnabled()) { blockLog.debug("BLOCK* {}: ask {} to delete {}", @@ -5271,7 +5288,7 @@ public class BlockManager implements BlockStatsMXBean { private void remove(long time) { if (checkToDeleteIterator()) { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { while (toDeleteIterator.hasNext()) { removeBlock(toDeleteIterator.next()); @@ -5282,7 +5299,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock("markedDeleteBlockScrubberThread"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "markedDeleteBlockScrubberThread"); } } } @@ -5396,12 +5413,13 @@ public class BlockManager implements BlockStatsMXBean { int workFound = this.computeBlockReconstructionWork(blocksToProcess); // Update counters - namesystem.writeLock(); + // TODO: Make corruptReplicas thread safe to remove this lock. + namesystem.writeLock(FSNamesystemLockMode.BM); try { this.updateState(); this.scheduledReplicationBlocksCount = workFound; } finally { - namesystem.writeUnlock("computeDatanodeWork"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeDatanodeWork"); } workFound += this.computeInvalidateWork(nodesToProcess); return workFound; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 4e4464b32599..558c43d06777 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -145,7 +145,7 @@ public class ProvidedStorageMap { private void processProvidedStorageReport() throws IOException { - assert lock.hasWriteLock() : "Not holding write lock"; + assert lock.hasWriteLock(FSNamesystemLockMode.GLOBAL) : "Not holding write lock"; if (providedStorageInfo.getBlockReportCount() == 0 || providedDescriptor.activeProvidedDatanodes() == 0) { LOG.info("Calling process first blk report from storage: " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 904c0a70c7d6..2566baccf7e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3955,7 +3955,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, @Override public boolean isInSnapshot(long blockCollectionID) { - assert hasReadLock(); + assert hasReadLock(FSNamesystemLockMode.FS); final INodeFile bc = getBlockCollection(blockCollectionID); if (bc == null || !bc.isUnderConstruction()) { return false; @@ -5347,11 +5347,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { - writeLock(); + // completeBlock will updateQuota, so it needs BMWriteLock and FSWriteLock. + // processExtraRedundancyBlock chooses excess replicas depending on storage policyId, + // so it needs FSReadLock. + writeLock(FSNamesystemLockMode.GLOBAL); try { blockManager.processIncrementalBlockReport(nodeID, srdb); } finally { - writeUnlock("processIncrementalBlockReport"); + writeUnlock(FSNamesystemLockMode.GLOBAL, "processIncrementalBlockReport"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index f6bbd63f7e98..187208253327 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -2334,4 +2334,4 @@ public class TestBlockManager { DataNodeFaultInjector.set(oldInjector); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java index c7f837978624..aa692185cb38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestProvidedImpl; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.util.RwLock; import org.junit.Before; @@ -87,7 +88,7 @@ public class TestProvidedStorageMap { DatanodeStorage dn1DiskStorage = new DatanodeStorage( "sid-1", DatanodeStorage.State.NORMAL, StorageType.DISK); - when(nameSystemLock.hasWriteLock()).thenReturn(true); + when(nameSystemLock.hasWriteLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true); DatanodeStorageInfo dns1Provided = providedMap.getStorage(dn1, dn1ProvidedStorage); DatanodeStorageInfo dns1Disk = providedMap.getStorage(dn1, dn1DiskStorage); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index b99e060ee387..fd7ea14446ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.ReflectionUtils; @@ -1406,6 +1407,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { FSNamesystem mockNS = mock(FSNamesystem.class); when(mockNS.hasWriteLock()).thenReturn(true); when(mockNS.hasReadLock()).thenReturn(true); + when(mockNS.hasWriteLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true); + when(mockNS.hasReadLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true); + when(mockNS.hasWriteLock(FSNamesystemLockMode.BM)).thenReturn(true); + when(mockNS.hasReadLock(FSNamesystemLockMode.BM)).thenReturn(true); + when(mockNS.hasWriteLock(FSNamesystemLockMode.FS)).thenReturn(true); + when(mockNS.hasReadLock(FSNamesystemLockMode.FS)).thenReturn(true); BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration()); LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org