This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 94d6a77c394 HDFS-17496. DataNode supports more fine-grained dataset 
lock based on blockid. (#6764). Contributed by farmmamba.
94d6a77c394 is described below

commit 94d6a77c39452c82ba78cef2cd96f5c8ff4fcfa3
Author: hfutatzhanghb <hfutzhan...@163.com>
AuthorDate: Thu Jan 2 18:41:43 2025 +0800

    HDFS-17496. DataNode supports more fine-grained dataset lock based on 
blockid. (#6764). Contributed by farmmamba.
    
    Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org>
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../hdfs/server/common/DataNodeLockManager.java    |   3 +-
 .../hdfs/server/datanode/DataSetLockManager.java   |  38 ++++++-
 .../DataSetSubLockStrategy.java}                   |  43 ++------
 .../server/datanode/ModDataSetSubLockStrategy.java |  53 +++++++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java     | 121 ++++++++++++++-------
 .../src/main/resources/hdfs-default.xml            |   9 ++
 .../server/datanode/TestDataSetLockManager.java    |  11 ++
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java |   7 +-
 .../hdfs/server/namenode/ha/TestDNFencing.java     |  11 +-
 10 files changed, 222 insertions(+), 78 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index dd3193fdadf..d85e7c58231 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1744,6 +1744,10 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final boolean
       DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;
 
+  public static final String DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY =
+      "dfs.datanode.dataset.sublock.count";
+  public static final long DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT = 1000L;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
index e7a3b38357a..cb22a057062 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
@@ -29,7 +29,8 @@ public interface DataNodeLockManager<T extends 
AutoCloseDataSetLock> {
    */
   enum LockLevel {
     BLOCK_POOl,
-    VOLUME
+    VOLUME,
+    DIR
   }
 
   /**
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
index 5579541eb72..61492467a41 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
@@ -94,6 +94,13 @@ public class DataSetLockManager implements 
DataNodeLockManager<AutoCloseDataSetL
             + resources[0] + "volume lock :" + resources[1]);
       }
       return resources[0] + resources[1];
+    } else if (resources.length == 3 && level == LockLevel.DIR) {
+      if (resources[0] == null || resources[1] == null || resources[2] == 
null) {
+        throw new IllegalArgumentException("acquire a null dataset lock : "
+            + resources[0] + ",volume lock :" + resources[1]
+        + ",subdir lock :" + resources[2]);
+      }
+      return resources[0] + resources[1] + resources[2];
     } else {
       throw new IllegalArgumentException("lock level do not match resource");
     }
@@ -153,7 +160,7 @@ public class DataSetLockManager implements 
DataNodeLockManager<AutoCloseDataSetL
   public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
     if (level == LockLevel.BLOCK_POOl) {
       return getReadLock(level, resources[0]);
-    } else {
+    } else if (level == LockLevel.VOLUME){
       AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, 
resources[0]);
       AutoCloseDataSetLock volLock = getReadLock(level, resources);
       volLock.setParentLock(bpLock);
@@ -162,6 +169,17 @@ public class DataSetLockManager implements 
DataNodeLockManager<AutoCloseDataSetL
             resources[0]);
       }
       return volLock;
+    } else {
+      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, 
resources[0]);
+      AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, 
resources[0], resources[1]);
+      volLock.setParentLock(bpLock);
+      AutoCloseDataSetLock dirLock = getReadLock(level, resources);
+      dirLock.setParentLock(volLock);
+      if (openLockTrace) {
+        LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " 
parent lock " +
+            resources[0] + resources[1]);
+      }
+      return dirLock;
     }
   }
 
@@ -169,7 +187,7 @@ public class DataSetLockManager implements 
DataNodeLockManager<AutoCloseDataSetL
   public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
     if (level == LockLevel.BLOCK_POOl) {
       return getWriteLock(level, resources[0]);
-    } else {
+    } else if (level == LockLevel.VOLUME) {
       AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, 
resources[0]);
       AutoCloseDataSetLock volLock = getWriteLock(level, resources);
       volLock.setParentLock(bpLock);
@@ -178,6 +196,17 @@ public class DataSetLockManager implements 
DataNodeLockManager<AutoCloseDataSetL
             resources[0]);
       }
       return volLock;
+    } else {
+      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, 
resources[0]);
+      AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, 
resources[0], resources[1]);
+      volLock.setParentLock(bpLock);
+      AutoCloseDataSetLock dirLock = getWriteLock(level, resources);
+      dirLock.setParentLock(volLock);
+      if (openLockTrace) {
+        LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " 
parent lock " +
+            resources[0] + resources[1]);
+      }
+      return dirLock;
     }
   }
 
@@ -224,8 +253,13 @@ public class DataSetLockManager implements 
DataNodeLockManager<AutoCloseDataSetL
     String lockName = generateLockName(level, resources);
     if (level == LockLevel.BLOCK_POOl) {
       lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
+    } else if (level == LockLevel.VOLUME) {
+      lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
+      lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
     } else {
       lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
+      lockMap.addLock(generateLockName(LockLevel.VOLUME, resources[0], 
resources[1]),
+          new ReentrantReadWriteLock(isFair));
       lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
     }
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java
similarity index 50%
copy from 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
copy to 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java
index e7a3b38357a..7ba1df8df52 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java
@@ -16,44 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdfs.server.common;
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.List;
 
 /**
- * Use for manage a set of lock for datanode.
+ * This interface is used to generate sub lock name for a blockid.
  */
-public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {
-
-  /**
-   * Acquire block pool level first if you want to Acquire volume lock.
-   * Or only acquire block pool level lock.
-   */
-  enum LockLevel {
-    BLOCK_POOl,
-    VOLUME
-  }
+public interface DataSetSubLockStrategy {
 
   /**
-   * Acquire readLock and then lock.
+   * Generate sub lock name for the given blockid.
+   * @param blockid the block id.
+   * @return sub lock name for the input blockid.
    */
-  T readLock(LockLevel level, String... resources);
+  String blockIdToSubLock(long blockid);
 
-  /**
-   * Acquire writeLock and then lock.
-   */
-  T writeLock(LockLevel level, String... resources);
-
-  /**
-   * Add a lock to LockManager.
-   */
-  void addLock(LockLevel level, String... resources);
-
-  /**
-   * Remove a lock from LockManager.
-   */
-  void removeLock(LockLevel level, String... resources);
-
-  /**
-   * LockManager may need to back hook.
-   */
-  void hook();
+  List<String> getAllSubLockName();
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java
new file mode 100644
index 00000000000..5e736e54716
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ModDataSetSubLockStrategy implements DataSetSubLockStrategy {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DataSetSubLockStrategy.class);
+
+  private static final String LOCK_NAME_PERFIX = "SubLock";
+  private long modFactor;
+
+  public ModDataSetSubLockStrategy(long mod) {
+    if (mod <= 0) {
+      mod = 1L;
+    }
+    this.modFactor = mod;
+  }
+
+  @Override
+  public String blockIdToSubLock(long blockid) {
+    return LOCK_NAME_PERFIX + (blockid % modFactor);
+  }
+
+  @Override
+  public List<String> getAllSubLockName() {
+    List<String> res = new ArrayList<>();
+    for (long i = 0L; i < modFactor; i++) {
+      res.add(LOCK_NAME_PERFIX + i);
+    }
+    return res;
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index eeec1bb7288..91b12daef81 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -65,9 +65,11 @@ import 
org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
 import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataSetSubLockStrategy;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
+import org.apache.hadoop.hdfs.server.datanode.ModDataSetSubLockStrategy;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -198,8 +200,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        bpid)) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        bpid, getReplicaInfo(bpid, blkid).getStorageUuid(),
+        datasetSubLockStrategy.blockIdToSubLock(blkid))) {
       ReplicaInfo r = volumeMap.get(bpid, blkid);
       if (r == null) {
         return null;
@@ -288,6 +291,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private long lastDirScannerNotifyTime;
   private volatile long lastDirScannerFinishTime;
 
+  private final DataSetSubLockStrategy datasetSubLockStrategy;
+  private final long datasetSubLockCount;
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -392,6 +398,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY,
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT);
     lastDirScannerNotifyTime = System.currentTimeMillis();
+    datasetSubLockCount = 
conf.getLong(DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY,
+        DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT);
+    this.datasetSubLockStrategy = new 
ModDataSetSubLockStrategy(datasetSubLockCount);
   }
 
   /**
@@ -430,6 +439,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       FsVolumeReference ref) throws IOException {
     for (String bp : volumeMap.getBlockPoolList()) {
       lockManager.addLock(LockLevel.VOLUME, bp, 
ref.getVolume().getStorageID());
+      List<String> allSubDirNameForDataSetLock = 
datasetSubLockStrategy.getAllSubLockName();
+      for (String dir : allSubDirNameForDataSetLock) {
+        lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(), 
dir);
+        LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
+            bp, ref.getVolume().getStorageID(), dir);
+      }
     }
     DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
     if (dnStorage != null) {
@@ -629,6 +644,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       for (String storageUuid : storageToRemove) {
         storageMap.remove(storageUuid);
         for (String bp : volumeMap.getBlockPoolList()) {
+          List<String> allSubDirNameForDataSetLock = 
datasetSubLockStrategy.getAllSubLockName();
+          for (String dir : allSubDirNameForDataSetLock) {
+            lockManager.removeLock(LockLevel.DIR, bp, storageUuid, dir);
+            LOG.info("Removed DIR lock for bpid:{}, volume storageid:{}, 
dir:{}",
+                bp, storageUuid, dir);
+          }
           lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
         }
       }
@@ -819,8 +840,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       long seekOffset) throws IOException {
 
     ReplicaInfo info;
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        b.getBlockPoolId())) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
 
@@ -914,8 +936,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
@@ -1380,8 +1403,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   @Override  // FsDatasetSpi
   public ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       // If the block was successfully finalized because all packets
       // were successfully processed at the Datanode but the ack for
       // some of the packets were not received by the client. The client
@@ -1433,8 +1457,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   private ReplicaInPipeline append(String bpid,
       ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        bpid, replicaInfo.getStorageUuid())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        bpid, replicaInfo.getStorageUuid(),
+        datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
       // If the block is cached, start uncaching it.
       if (replicaInfo.getState() != ReplicaState.FINALIZED) {
         throw new IOException("Only a Finalized replica can be appended to; "
@@ -1530,8 +1555,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = 
lockManager.writeLock(LockLevel.BLOCK_POOl,
-            b.getBlockPoolId())) {
+        try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+            b.getBlockPoolId(), getStorageUuidForLock(b),
+            datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
           ReplicaInPipeline replica;
@@ -1564,8 +1590,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
         b, newGS, expectedBlockLen);
     while (true) {
       try {
-        try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-            b.getBlockPoolId(), getStorageUuidForLock(b))) {
+        try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+            b.getBlockPoolId(), getStorageUuidForLock(b),
+            datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
@@ -1650,8 +1677,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       }
 
       ReplicaInPipeline newReplicaInfo;
-      try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
-          b.getBlockPoolId(), v.getStorageID())) {
+      try (AutoCloseableLock l = lockManager.writeLock(LockLevel.DIR,
+          b.getBlockPoolId(), v.getStorageID(),
+          datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
         newReplicaInfo = v.createRbw(b);
         if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
           throw new IOException("CreateRBW returned a replica of state "
@@ -1681,8 +1709,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     try {
       while (true) {
         try {
-          try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-              b.getBlockPoolId(), getStorageUuidForLock(b))) {
+          try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+              b.getBlockPoolId(), getStorageUuidForLock(b),
+              datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
             ReplicaInfo replicaInfo =
                 getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
             // check the replica's state
@@ -1713,8 +1742,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       // check generation stamp
       long replicaGenerationStamp = rbw.getGenerationStamp();
       if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1775,8 +1805,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
     long startTimeMs = Time.monotonicNow();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       final long blockId = b.getBlockId();
       final long expectedGs = b.getGenerationStamp();
       final long visible = b.getNumBytes();
@@ -1915,8 +1946,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
         .getNumBytes());
     FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
     ReplicaInPipeline newReplicaInfo;
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), v.getStorageID())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), v.getStorageID(),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       try {
         newReplicaInfo = v.createTemporary(b);
         LOG.debug("creating temporary for block: {} on volume: {}",
@@ -1973,8 +2005,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     ReplicaInfo replicaInfo = null;
     ReplicaInfo finalizedReplicaInfo = null;
     long startTimeMs = Time.monotonicNow();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       if (Thread.interrupted()) {
         // Don't allow data modifications from interrupted threads
         throw new IOException("Cannot finalize block: " + b + " from 
Interrupted Thread");
@@ -2010,8 +2043,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
 
   private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        bpid, replicaInfo.getStorageUuid())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        bpid, replicaInfo.getStorageUuid(),
+        datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
       // Compare generation stamp of old and new replica before finalizing
       if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
           > replicaInfo.getGenerationStamp()) {
@@ -2060,8 +2094,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   @Override // FsDatasetSpi
   public void unfinalizeBlock(ExtendedBlock b) throws IOException {
     long startTimeMs = Time.monotonicNow();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
-        b.getBlockPoolId(), getStorageUuidForLock(b))) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+        b.getBlockPoolId(), getStorageUuidForLock(b),
+        datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getLocalBlock());
       if (replicaInfo != null &&
@@ -2459,7 +2494,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     final String bpid = block.getBlockPoolId();
     final Block localBlock = block.getLocalBlock();
     final long blockId = localBlock.getBlockId();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, 
bpid)) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, 
volume.getStorageID(),
+        datasetSubLockStrategy.blockIdToSubLock(blockId))) {
       final ReplicaInfo info = volumeMap.get(bpid, localBlock);
       if (info == null) {
         ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId);
@@ -2548,8 +2584,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
           bpid + ": ReplicaInfo not found.");
       return;
     }
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
-        info.getStorageUuid())) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
+        info.getStorageUuid(), 
datasetSubLockStrategy.blockIdToSubLock(blockId))) {
       boolean success = false;
       try {
         info = volumeMap.get(bpid, blockId);
@@ -2746,7 +2782,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       lastDirScannerNotifyTime = startTimeMs;
     }
     String storageUuid = vol.getStorageID();
-    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, 
bpid, storageUuid)) {
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
+        vol.getStorageID(), datasetSubLockStrategy.blockIdToSubLock(blockId))) 
{
       if (!storageMap.containsKey(storageUuid)) {
         // Storage was already removed
         return;
@@ -3231,8 +3268,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        block.getBlockPoolId())) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        block.getBlockPoolId(), getStorageUuidForLock(block),
+        datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
       final Replica replica = getReplicaInfo(block.getBlockPoolId(),
           block.getBlockId());
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -3259,6 +3297,12 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
       Set<String> vols = storageMap.keySet();
       for (String v : vols) {
         lockManager.addLock(LockLevel.VOLUME, bpid, v);
+        List<String> allSubDirNameForDataSetLock = 
datasetSubLockStrategy.getAllSubLockName();
+        for (String dir : allSubDirNameForDataSetLock) {
+          lockManager.addLock(LockLevel.DIR, bpid, v, dir);
+          LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
+              bpid, v, dir);
+        }
       }
     }
     try {
@@ -3386,8 +3430,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
-        block.getBlockPoolId())) {
+    try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+        block.getBlockPoolId(), getStorageUuidForLock(block),
+        datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2ab25f8329c..6bfed9a2904 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -6568,6 +6568,15 @@
       problem. In produce default set false, because it's have little 
performance loss.
     </description>
   </property>
+
+  <property>
+    <name>dfs.datanode.dataset.sublock.count</name>
+    <value>1000</value>
+    <description>
+      The dataset readwrite lock counts for a volume.
+    </description>
+  </property>
+
   <property>
     <name>dfs.client.fsck.connect.timeout</name>
     <value>60000ms</value>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
index b514accdf16..6cb12d2681f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
@@ -37,6 +37,7 @@ public class TestDataSetLockManager {
   public void testBaseFunc() {
     manager.addLock(LockLevel.BLOCK_POOl, "BPtest");
     manager.addLock(LockLevel.VOLUME, "BPtest", "Volumetest");
+    manager.addLock(LockLevel.DIR, "BPtest", "Volumetest", "SubDirtest");
 
     AutoCloseDataSetLock lock = manager.writeLock(LockLevel.BLOCK_POOl, 
"BPtest");
     AutoCloseDataSetLock lock1 = manager.readLock(LockLevel.BLOCK_POOl, 
"BPtest");
@@ -62,6 +63,16 @@ public class TestDataSetLockManager {
     manager.lockLeakCheck();
     assertNull(manager.getLastException());
 
+    AutoCloseDataSetLock lock6 = manager.writeLock(LockLevel.BLOCK_POOl, 
"BPtest");
+    AutoCloseDataSetLock lock7 = manager.readLock(LockLevel.VOLUME, "BPtest", 
"Volumetest");
+    AutoCloseDataSetLock lock8 = manager.readLock(LockLevel.DIR,
+        "BPtest", "Volumetest", "SubDirtest");
+    lock8.close();
+    lock7.close();
+    lock6.close();
+    manager.lockLeakCheck();
+    assertNull(manager.getLastException());
+
     manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest");
     manager.lockLeakCheck();
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 975874edb1f..f58ee729ef9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -1946,7 +1946,12 @@ public class TestFsDatasetImpl {
       assertFalse(uuids.contains(dn.getDatanodeUuid()));
 
       // This replica has deleted from datanode memory.
-      assertNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      try {
+        Block storedBlock = ds.getStoredBlock(bpid, 
extendedBlock.getBlockId());
+        assertNull(storedBlock);
+      } catch (Exception e) {
+        GenericTestUtils.assertExceptionContains("ReplicaNotFoundException", 
e);
+      }
     } finally {
       cluster.shutdown();
       DataNodeFaultInjector.set(oldInjector);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
index 9d79e496102..2846c16c220 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
@@ -596,9 +597,13 @@ public class TestDNFencing {
       throws IOException {
     int count = 0;
     for (DataNode dn : cluster.getDataNodes()) {
-      if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
-          block.getBlockPoolId(), block.getBlockId()) != null) {
-        count++;
+      try {
+        if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
+            block.getBlockPoolId(), block.getBlockId()) != null) {
+          count++;
+        }
+      } catch (ReplicaNotFoundException e) {
+        continue;
       }
     }
     return count;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to