This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f66c87e2ec2 HDDS-13984. Reuse ResourceLockTracker for 
HierarchicalResourceLockManager (#9352)
f66c87e2ec2 is described below

commit f66c87e2ec29728e5ac8eadcc385d49afdb56a10
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Tue Nov 25 18:11:41 2025 -0500

    HDDS-13984. Reuse ResourceLockTracker for HierarchicalResourceLockManager 
(#9352)
---
 .../ozone/om/lock/DAGResourceLockTracker.java      | 169 +++++++++++++++++
 .../om/lock/HierarchicalResourceLockManager.java   |   8 +
 .../ozone/om/lock/LeveledResourceLockTracker.java  |  94 ++++++++++
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     | 205 +++------------------
 .../PoolBasedHierarchicalResourceLockManager.java  |  32 +++-
 .../ReadOnlyHierarchicalResourceLockManager.java   |   6 +
 .../hadoop/ozone/om/lock/ResourceLockTracker.java  |  55 ++++++
 ...stPoolBasedHierarchicalResourceLockManager.java |  69 ++++---
 8 files changed, 438 insertions(+), 200 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/DAGResourceLockTracker.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/DAGResourceLockTracker.java
new file mode 100644
index 00000000000..7fd44059dd6
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/DAGResourceLockTracker.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+import java.util.stream.Stream;
+
+/**
+ * Specialized implementation of {@link ResourceLockTracker} to manage locks 
for
+ * {@link DAGLeveledResource} types. This class enforces hierarchical locking
+ * constraints based on a directed acyclic graph (DAG) structure, ensuring that
+ * resources are locked and unlocked in a consistent manner.
+ *
+ * This tracker maintains an adjacency set representing resource dependencies, 
wherein
+ * each resource's children in the DAG define locking constraints. The class
+ * populates these constraints at initialization and checks them dynamically 
when locks
+ * are requested.
+ *
+ * The class is implemented as a singleton to ensure a central mechanism for
+ * managing resource locks across threads. It uses ThreadLocal storage to 
track the
+ * number of locks held for each resource by the current thread.
+ *
+ * Key Features:
+ * 1. Hierarchical Locking: Resources are locked in the order defined by their 
DAG
+ *    structure. Violating dependencies will result in denial of the lock 
request.
+ * 2. Thread-safe: The class guarantees thread safety with synchronized 
initialization
+ *    and thread-local management of lock counts.
+ * 3. Deadlock Avoidance: Lock dependency constraints prevent circular 
dependencies
+ *    in resource locking.
+ * 4. Centralized Tracking: Tracks the current locks held across all 
DAGLeveledResource
+ *    instances and provides utilities to inspect current lock states.
+ *
+ * Thread safety: This class is thread-safe.
+ */
+final class DAGResourceLockTracker extends 
ResourceLockTracker<DAGLeveledResource> {
+
+  private final EnumMap<DAGLeveledResource, ThreadLocal<Integer>> 
acquiredLocksMap =
+      new EnumMap<>(DAGLeveledResource.class);
+  private final Map<DAGLeveledResource, Set<DAGLeveledResource>> 
lockDependentAdjacencySet =
+      new EnumMap<>(DAGLeveledResource.class);
+
+  private static volatile DAGResourceLockTracker instance = null;
+
+  public static DAGResourceLockTracker get() {
+    if (instance == null) {
+      synchronized (DAGResourceLockTracker.class) {
+        if (instance == null) {
+          instance = new DAGResourceLockTracker();
+        }
+      }
+    }
+    return instance;
+  }
+
+  /**
+   * Performs a Depth-First Search (DFS) traversal on a directed acyclic graph 
(DAG)
+   * composed of {@code DAGLeveledResource} objects. This method populates a 
mapping
+   * of resource dependencies while traversing the graph.
+   *
+   * The traversal stops for any resource already visited, avoiding potential 
cycles
+   * (though cycles should not exist in a valid DAG). Throws an exception if 
the provided
+   * stack is not empty at the start of the traversal.
+   *
+   * @param resource The starting resource for the DFS traversal.
+   * @param stack A stack used to manage the traversal process. The stack 
should be empty
+   *              when passed in.
+   * @param visited A set of resources that have already been visited during 
the traversal
+   *                to ensure each resource is processed only once.
+   */
+  private void dfs(DAGLeveledResource resource, Stack<DAGLeveledResource> 
stack, Set<DAGLeveledResource> visited) {
+    if (visited.contains(resource)) {
+      return;
+    }
+    if (!stack.isEmpty()) {
+      throw new IllegalStateException("Stack is not empty while beginning to 
traverse the DAG for resource :"
+          + resource);
+    }
+    stack.push(resource);
+    while (!stack.isEmpty()) {
+      DAGLeveledResource current = stack.peek();
+      if (!visited.contains(current)) {
+        visited.add(current);
+        for (DAGLeveledResource child : current.getChildren()) {
+          stack.push(child);
+        }
+      } else {
+        if (!lockDependentAdjacencySet.containsKey(current)) {
+          Set<DAGLeveledResource> adjacentResources = null;
+          for (DAGLeveledResource child : current.getChildren()) {
+            if (adjacentResources == null) {
+              adjacentResources = new HashSet<>();
+            }
+            adjacentResources.add(child);
+            adjacentResources.addAll(lockDependentAdjacencySet.get(child));
+          }
+          lockDependentAdjacencySet.put(current,
+              adjacentResources == null ? Collections.emptySet() : 
adjacentResources);
+        }
+        stack.pop();
+      }
+    }
+  }
+
+  private void populateLockDependentAdjacencySet() {
+    Set<DAGLeveledResource> visited = new HashSet<>();
+    Stack<DAGLeveledResource> stack = new Stack<>();
+    for (DAGLeveledResource resource : DAGLeveledResource.values()) {
+      dfs(resource, stack, visited);
+    }
+  }
+
+  private DAGResourceLockTracker() {
+    populateLockDependentAdjacencySet();
+    for (DAGLeveledResource dagLeveledResource : DAGLeveledResource.values()) {
+      acquiredLocksMap.put(dagLeveledResource, ThreadLocal.withInitial(() -> 
0));
+    }
+  }
+
+  private void updateAcquiredLockCount(DAGLeveledResource resource, int delta) 
{
+    acquiredLocksMap.get(resource).set(acquiredLocksMap.get(resource).get() + 
delta);
+  }
+
+  @Override
+  OMLockDetails lockResource(DAGLeveledResource resource) {
+    updateAcquiredLockCount(resource, 1);
+    return super.lockResource(resource);
+  }
+
+  @Override
+  OMLockDetails unlockResource(DAGLeveledResource resource) {
+    updateAcquiredLockCount(resource, -1);
+    return super.unlockResource(resource);
+  }
+
+  @Override
+  public boolean canLockResource(DAGLeveledResource resource) {
+    for (DAGLeveledResource child : lockDependentAdjacencySet.get(resource)) {
+      if (acquiredLocksMap.get(child).get() > 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  Stream<DAGLeveledResource> getCurrentLockedResources() {
+    return acquiredLocksMap.keySet().stream().filter(dagLeveled -> 
acquiredLocksMap.get(dagLeveled).get() > 0);
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
index 7a1ae7c956d..898a387a9ac 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
@@ -19,6 +19,7 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.stream.Stream;
 
 /**
  * Interface for Hierachical Resource Lock where the lock order acquired on 
resource is going to be deterministic and
@@ -47,6 +48,13 @@ public interface HierarchicalResourceLockManager extends 
AutoCloseable {
    */
   HierarchicalResourceLock acquireWriteLock(DAGLeveledResource resource, 
String key) throws IOException;
 
+  /**
+   * Retrieves a stream of all currently locked resources in the system by the 
thread calling this method.
+   *
+   * @return a stream of {@code DAGLeveledResource} representing the resources 
that are currently locked
+   */
+  Stream<DAGLeveledResource> getCurrentLockedResources();
+
   /**
    * Interface for managing the lock lifecycle corresponding to a Hierarchical 
Resource.
    */
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/LeveledResourceLockTracker.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/LeveledResourceLockTracker.java
new file mode 100644
index 00000000000..bbe9cd9076c
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/LeveledResourceLockTracker.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+/**
+ * The LeveledResourceLockTracker class is a singleton that extends the
+ * {@link ResourceLockTracker} to manage locks on leveled resources defined
+ * by {@link OzoneManagerLock.LeveledResource}. This class provides 
functionality
+ * to handle lock operations for resources that are managed at different 
hierarchical
+ * levels, utilizing a {@link ThreadLocal} mechanism for tracking the current 
lock set
+ * for each thread.
+ *
+ * The class ensures thread-safety by restricting lock access to the current 
thread's
+ * context. It allows checking if a resource can be locked, locking a 
resource, unlocking
+ * a resource, and retrieving details of currently locked resources.
+ *
+ * Thread safety is achieved through the use of a volatile singleton instance 
and
+ * double-checked locking during initialization. The internal lock state is 
maintained
+ * per-thread using a {@link ThreadLocal} variable.
+ *
+ * Key methods include:
+ *  - {@code get()}: Retrieves the singleton instance of the 
LeveledResourceLockTracker.
+ *  - {@code canLockResource(OzoneManagerLock.LeveledResource resource)}: 
Determines
+ *    if a specific resource can be locked based on the current lock set.
+ *  - {@code getCurrentLockedResources()}: Provides a stream of currently 
locked
+ *    resources for the current thread.
+ *  - {@code lockResource(OzoneManagerLock.LeveledResource resource)}: Locks a 
specified
+ *    resource and updates the current thread's lock state.
+ *  - {@code unlockResource(OzoneManagerLock.LeveledResource resource)}: 
Unlocks a specified
+ *    resource and updates the current thread's lock state accordingly.
+ *
+ * This class is final to prevent subclassing, ensuring the integrity of its
+ * singleton behavior and thread-local lock tracking mechanism.
+ */
+final class LeveledResourceLockTracker extends 
ResourceLockTracker<OzoneManagerLock.LeveledResource> {
+  private final ThreadLocal<Short> lockSet = ThreadLocal.withInitial(() -> 
Short.valueOf((short) 0));
+  private static volatile LeveledResourceLockTracker instance = null;
+
+  private LeveledResourceLockTracker() {
+  }
+
+  public static LeveledResourceLockTracker get() {
+    if (instance == null) {
+      synchronized (LeveledResourceLockTracker.class) {
+        if (instance == null) {
+          instance = new LeveledResourceLockTracker();
+        }
+      }
+    }
+    return instance;
+  }
+
+  @Override
+  public boolean canLockResource(OzoneManagerLock.LeveledResource resource) {
+    return resource.canLock(lockSet.get());
+  }
+
+  @Override
+  Stream<OzoneManagerLock.LeveledResource> getCurrentLockedResources() {
+    short lockSetVal = lockSet.get();
+    return Arrays.stream(OzoneManagerLock.LeveledResource.values())
+        .filter(leveledResource -> leveledResource.isLevelLocked(lockSetVal));
+  }
+
+  @Override
+  public OMLockDetails unlockResource(OzoneManagerLock.LeveledResource 
resource) {
+    lockSet.set(resource.clearLock(lockSet.get()));
+    return super.unlockResource(resource);
+  }
+
+  @Override
+  public OMLockDetails lockResource(OzoneManagerLock.LeveledResource resource) 
{
+    lockSet.set(resource.setLock(lockSet.get()));
+    return super.lockResource(resource);
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 351b3af25eb..79c1fa6a84b 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -31,19 +31,15 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.Stack;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -102,7 +98,7 @@ public class OzoneManagerLock implements IOzoneManagerLock {
       LoggerFactory.getLogger(OzoneManagerLock.class);
 
   private final Map<Class<? extends Resource>,
-      Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>> 
resourcelockMap;
+      Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>> 
resourcelockMap;
 
   private OMLockMetrics omLockMetrics;
 
@@ -116,22 +112,22 @@ public OzoneManagerLock(ConfigurationSource conf) {
         getFlatLocks(conf));
   }
 
-  private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
getLeveledLocks(
+  private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker> 
getLeveledLocks(
       ConfigurationSource conf) {
     Map<LeveledResource, Striped<ReadWriteLock>> stripedLockMap = new 
EnumMap<>(LeveledResource.class);
     for (LeveledResource r : LeveledResource.values()) {
       stripedLockMap.put(r, createStripeLock(r, conf));
     }
-    return Pair.of(Collections.unmodifiableMap(stripedLockMap), new 
LeveledResourceLockManager());
+    return Pair.of(Collections.unmodifiableMap(stripedLockMap), 
LeveledResourceLockTracker.get());
   }
 
-  private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
getFlatLocks(
+  private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker> 
getFlatLocks(
       ConfigurationSource conf) {
     Map<DAGLeveledResource, Striped<ReadWriteLock>> stripedLockMap = new 
EnumMap<>(DAGLeveledResource.class);
     for (DAGLeveledResource r : DAGLeveledResource.values()) {
       stripedLockMap.put(r, createStripeLock(r, conf));
     }
-    return Pair.of(Collections.unmodifiableMap(stripedLockMap), new 
DAGResourceLockManager());
+    return Pair.of(Collections.unmodifiableMap(stripedLockMap), 
DAGResourceLockTracker.get());
   }
 
   private Striped<ReadWriteLock> createStripeLock(Resource r,
@@ -277,11 +273,11 @@ private void acquireLock(Resource resource, boolean 
isReadLock, ReadWriteLock lo
 
   private OMLockDetails acquireLocks(Resource resource, boolean isReadLock,
       Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>> 
lockListProvider) {
-    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
resourceLockPair =
+    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker> 
resourceLockPair =
         resourcelockMap.get(resource.getClass());
-    ResourceLockManager<Resource> resourceLockManager = 
resourceLockPair.getRight();
-    resourceLockManager.clearLockDetails();
-    if (!resourceLockManager.canLockResource(resource)) {
+    ResourceLockTracker<Resource> resourceLockTracker = 
resourceLockPair.getRight();
+    resourceLockTracker.clearLockDetails();
+    if (!resourceLockTracker.canLockResource(resource)) {
       String errorMessage = getErrorMessage(resource);
       LOG.error(errorMessage);
       throw new RuntimeException(errorMessage);
@@ -292,15 +288,15 @@ private OMLockDetails acquireLocks(Resource resource, 
boolean isReadLock,
     for (ReadWriteLock lock : 
lockListProvider.apply(resourceLockPair.getKey().get(resource))) {
       acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
     }
-    return resourceLockManager.lockResource(resource);
+    return resourceLockTracker.lockResource(resource);
   }
 
   private OMLockDetails acquireLock(Resource resource, boolean isReadLock, 
String... keys) {
-    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
resourceLockPair =
+    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker> 
resourceLockPair =
         resourcelockMap.get(resource.getClass());
-    ResourceLockManager<Resource> resourceLockManager = 
resourceLockPair.getRight();
-    resourceLockManager.clearLockDetails();
-    if (!resourceLockManager.canLockResource(resource)) {
+    ResourceLockTracker<Resource> resourceLockTracker = 
resourceLockPair.getRight();
+    resourceLockTracker.clearLockDetails();
+    if (!resourceLockTracker.canLockResource(resource)) {
       String errorMessage = getErrorMessage(resource);
       LOG.error(errorMessage);
       throw new RuntimeException(errorMessage);
@@ -310,7 +306,7 @@ private OMLockDetails acquireLock(Resource resource, 
boolean isReadLock, String.
 
     ReentrantReadWriteLock lock = getLock(resourceLockPair.getKey(), resource, 
keys);
     acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
-    return resourceLockManager.lockResource(resource);
+    return resourceLockTracker.lockResource(resource);
   }
 
   private void updateReadLockMetrics(Resource resource,
@@ -365,7 +361,7 @@ private String getErrorMessage(Resource resource) {
   @VisibleForTesting
   List<String> getCurrentLocks() {
     return resourcelockMap.values().stream().map(Pair::getValue)
-        .flatMap(rlm -> ((ResourceLockManager<? extends 
Resource>)rlm).getCurrentLockedResources())
+        .flatMap(rlm -> ((ResourceLockTracker<? extends 
Resource>)rlm).getCurrentLockedResources())
         .map(Resource::getName)
         .collect(Collectors.toList());
   }
@@ -455,10 +451,10 @@ public OMLockDetails releaseReadLocks(Resource resource, 
Collection<String[]> ke
 
   private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
       String... keys) {
-    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
resourceLockPair =
+    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker> 
resourceLockPair =
         resourcelockMap.get(resource.getClass());
-    ResourceLockManager<Resource> resourceLockManager = 
resourceLockPair.getRight();
-    resourceLockManager.clearLockDetails();
+    ResourceLockTracker<Resource> resourceLockTracker = 
resourceLockPair.getRight();
+    resourceLockTracker.clearLockDetails();
     ReentrantReadWriteLock lock = getLock(resourceLockPair.getKey(), resource, 
keys);
     if (isReadLock) {
       lock.readLock().unlock();
@@ -468,15 +464,15 @@ private OMLockDetails releaseLock(Resource resource, 
boolean isReadLock,
       lock.writeLock().unlock();
       updateWriteUnlockMetrics(resource, lock, isWriteLocked);
     }
-    return resourceLockManager.unlockResource(resource);
+    return resourceLockTracker.unlockResource(resource);
   }
 
   private OMLockDetails releaseLocks(Resource resource, boolean isReadLock,
       Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>> 
lockListProvider) {
-    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
resourceLockPair =
+    Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker> 
resourceLockPair =
         resourcelockMap.get(resource.getClass());
-    ResourceLockManager<Resource> resourceLockManager = 
resourceLockPair.getRight();
-    resourceLockManager.clearLockDetails();
+    ResourceLockTracker<Resource> resourceLockTracker = 
resourceLockPair.getRight();
+    resourceLockTracker.clearLockDetails();
     List<ReadWriteLock> locks = 
StreamSupport.stream(lockListProvider.apply(resourceLockPair.getKey().get(resource))
             .spliterator(), false).collect(Collectors.toList());
     // Release locks in reverse order.
@@ -491,7 +487,7 @@ private OMLockDetails releaseLocks(Resource resource, 
boolean isReadLock,
         updateWriteUnlockMetrics(resource, (ReentrantReadWriteLock) lock, 
isWriteLocked);
       }
     }
-    return resourceLockManager.unlockResource(resource);
+    return resourceLockTracker.unlockResource(resource);
   }
 
   private void updateReadUnlockMetrics(Resource resource,
@@ -581,151 +577,6 @@ public OMLockMetrics getOMLockMetrics() {
     return omLockMetrics;
   }
 
-  private abstract static class ResourceLockManager<T extends Resource> {
-
-    private final ThreadLocal<OMLockDetails> omLockDetails = 
ThreadLocal.withInitial(OMLockDetails::new);
-
-    abstract boolean canLockResource(T resource);
-
-    abstract Stream<T> getCurrentLockedResources();
-
-    OMLockDetails clearLockDetails() {
-      omLockDetails.get().clear();
-      return omLockDetails.get();
-    }
-
-    OMLockDetails unlockResource(T resource) {
-      return omLockDetails.get();
-    }
-
-    OMLockDetails lockResource(T resource) {
-      omLockDetails.get().setLockAcquired(true);
-      return omLockDetails.get();
-    }
-  }
-
-  private static final class DAGResourceLockManager extends 
ResourceLockManager<DAGLeveledResource> {
-
-    private final EnumMap<DAGLeveledResource, ThreadLocal<Boolean>> 
acquiredLocksMap =
-        new EnumMap<>(DAGLeveledResource.class);
-    private final Map<DAGLeveledResource, Set<DAGLeveledResource>> 
lockDependentAdjacencySet =
-        new EnumMap<>(DAGLeveledResource.class);
-
-    /**
-     * Performs a depth-first traversal (DFS) on the directed acyclic graph 
(DAG)
-     * of {@link DAGLeveledResource} objects, processing dependencies and 
updating
-     * the lock-dependent adjacency set with resource relationships.
-     *
-     * @param resource the DAGLeveledResource instance to start the traversal 
from
-     * @param stack a stack to manage the DFS traversal state
-     * @param visited a set to keep track of resources that have already been 
visited
-     *                during the traversal
-     */
-    private void dfs(DAGLeveledResource resource, Stack<DAGLeveledResource> 
stack, Set<DAGLeveledResource> visited) {
-      if (visited.contains(resource)) {
-        return;
-      }
-      if (!stack.isEmpty()) {
-        throw new IllegalStateException("Stack is not empty while beginning to 
traverse the DAG for resource :"
-            + resource);
-      }
-      stack.push(resource);
-      while (!stack.isEmpty()) {
-        DAGLeveledResource current = stack.peek();
-        if (!visited.contains(current)) {
-          visited.add(current);
-          for (DAGLeveledResource child : current.getChildren()) {
-            stack.push(child);
-          }
-        } else {
-          if (!lockDependentAdjacencySet.containsKey(current)) {
-            Set<DAGLeveledResource> adjacentResources = null;
-            for (DAGLeveledResource child : current.getChildren()) {
-              if (adjacentResources == null) {
-                adjacentResources = new HashSet<>();
-              }
-              adjacentResources.add(child);
-              adjacentResources.addAll(lockDependentAdjacencySet.get(child));
-            }
-            lockDependentAdjacencySet.put(current,
-                adjacentResources == null ? Collections.emptySet() : 
adjacentResources);
-          }
-          stack.pop();
-        }
-      }
-    }
-
-    private void populateLockDependentAdjacencySet() {
-      Set<DAGLeveledResource> visited = new HashSet<>();
-      Stack<DAGLeveledResource> stack = new Stack<>();
-      for (DAGLeveledResource resource : DAGLeveledResource.values()) {
-        dfs(resource, stack, visited);
-      }
-    }
-
-    private DAGResourceLockManager() {
-      populateLockDependentAdjacencySet();
-      for (DAGLeveledResource dagLeveledResource : 
DAGLeveledResource.values()) {
-        acquiredLocksMap.put(dagLeveledResource, ThreadLocal.withInitial(() -> 
Boolean.FALSE));
-      }
-    }
-
-    @Override
-    OMLockDetails lockResource(DAGLeveledResource resource) {
-      acquiredLocksMap.get(resource).set(Boolean.TRUE);
-      return super.lockResource(resource);
-    }
-
-    @Override
-    OMLockDetails unlockResource(DAGLeveledResource resource) {
-      acquiredLocksMap.get(resource).set(Boolean.FALSE);
-      return super.unlockResource(resource);
-    }
-
-    @Override
-    public boolean canLockResource(DAGLeveledResource resource) {
-      for (DAGLeveledResource child : lockDependentAdjacencySet.get(resource)) 
{
-        if (acquiredLocksMap.get(child).get()) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    @Override
-    Stream<DAGLeveledResource> getCurrentLockedResources() {
-      return acquiredLocksMap.keySet().stream().filter(dagLeveled -> 
acquiredLocksMap.get(dagLeveled).get());
-    }
-  }
-
-  private static final class LeveledResourceLockManager extends 
ResourceLockManager<LeveledResource> {
-    private final ThreadLocal<Short> lockSet = ThreadLocal.withInitial(() -> 
Short.valueOf((short)0));
-
-    @Override
-    public boolean canLockResource(LeveledResource resource) {
-      return resource.canLock(lockSet.get());
-    }
-
-    @Override
-    Stream<LeveledResource> getCurrentLockedResources() {
-      short lockSetVal = lockSet.get();
-      return Arrays.stream(LeveledResource.values())
-          .filter(leveledResource -> 
leveledResource.isLevelLocked(lockSetVal));
-    }
-
-    @Override
-    public OMLockDetails unlockResource(LeveledResource resource) {
-      lockSet.set(resource.clearLock(lockSet.get()));
-      return super.unlockResource(resource);
-    }
-
-    @Override
-    public OMLockDetails lockResource(LeveledResource resource) {
-      lockSet.set(resource.setLock(lockSet.get()));
-      return super.lockResource(resource);
-    }
-  }
-
   /**
    * Leveled Resource defined in Ozone.
    * Enforces lock acquisition ordering based on the resource level. A 
resource at lower level cannot be acquired
@@ -849,7 +700,7 @@ short getMask() {
    * @param type IPC Timing types
    * @param deltaNanos consumed time
    */
-  private void updateProcessingDetails(ResourceLockManager<? extends Resource> 
resourceLockManager, Timing type,
+  private void updateProcessingDetails(ResourceLockTracker<? extends Resource> 
resourceLockTracker, Timing type,
       long deltaNanos) {
     Server.Call call = Server.getCurCall().get();
     if (call != null) {
@@ -857,13 +708,13 @@ private void 
updateProcessingDetails(ResourceLockManager<? extends Resource> res
     } else {
       switch (type) {
       case LOCKWAIT:
-        resourceLockManager.omLockDetails.get().add(deltaNanos, 
OMLockDetails.LockOpType.WAIT);
+        resourceLockTracker.getOmLockDetails().add(deltaNanos, 
OMLockDetails.LockOpType.WAIT);
         break;
       case LOCKSHARED:
-        resourceLockManager.omLockDetails.get().add(deltaNanos, 
OMLockDetails.LockOpType.READ);
+        resourceLockTracker.getOmLockDetails().add(deltaNanos, 
OMLockDetails.LockOpType.READ);
         break;
       case LOCKEXCLUSIVE:
-        resourceLockManager.omLockDetails.get().add(deltaNanos, 
OMLockDetails.LockOpType.WRITE);
+        resourceLockTracker.getOmLockDetails().add(deltaNanos, 
OMLockDetails.LockOpType.WRITE);
         break;
       default:
         LOG.error("Unsupported Timing type {}", type);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
index ac94c57c31b..aa28a9d4df4 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
@@ -32,12 +32,16 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A lock manager implementation that manages hierarchical resource locks
@@ -47,7 +51,11 @@
  * DAGs (e.g., File System trees or snapshot chains).
  */
 public class PoolBasedHierarchicalResourceLockManager implements 
HierarchicalResourceLockManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PoolBasedHierarchicalResourceLockManager.class);
+
   private final GenericObjectPool<ReadWriteLock> lockPool;
+  private final ResourceLockTracker<DAGLeveledResource> resourceLockTracker;
   private final Map<DAGLeveledResource, Map<String, LockReferenceCountPair>> 
lockMap;
 
   public PoolBasedHierarchicalResourceLockManager(OzoneConfiguration conf) {
@@ -61,6 +69,7 @@ public 
PoolBasedHierarchicalResourceLockManager(OzoneConfiguration conf) {
     config.setBlockWhenExhausted(true);
     this.lockPool = new GenericObjectPool<>(new ReadWriteLockFactory(), 
config);
     this.lockMap = new ConcurrentHashMap<>();
+    this.resourceLockTracker = DAGResourceLockTracker.get();
   }
 
   private ReadWriteLock operateOnLock(DAGLeveledResource resource, String key,
@@ -102,8 +111,25 @@ public HierarchicalResourceLock 
acquireWriteLock(DAGLeveledResource resource, St
     return acquireLock(resource, key, false);
   }
 
+  private String getErrorMessage(IOzoneManagerLock.Resource resource) {
+    return "Thread '" + Thread.currentThread().getName() + "' cannot " +
+        "acquire " + resource.getName() + " lock while holding " +
+        
resourceLockTracker.getCurrentLockedResources().map(IOzoneManagerLock.Resource::getName)
+            .collect(Collectors.toList()) + " lock(s).";
+  }
+
+  @Override
+  public Stream<DAGLeveledResource> getCurrentLockedResources() {
+    return resourceLockTracker.getCurrentLockedResources();
+  }
+
   private HierarchicalResourceLock acquireLock(DAGLeveledResource resource, 
String key, boolean isReadLock)
       throws IOException {
+    if (!resourceLockTracker.canLockResource(resource)) {
+      String errorMessage = getErrorMessage(resource);
+      LOG.error(errorMessage);
+      throw new RuntimeException(errorMessage);
+    }
     ReadWriteLock readWriteLock = operateOnLock(resource, key, 
LockReferenceCountPair::increment);
     if (readWriteLock == null) {
       throw new IOException("Unable to acquire " + (isReadLock ? "read" : 
"write") + " lock on resource "
@@ -133,19 +159,20 @@ public void close() {
    * class, {@code PoolBasedHierarchicalResourceLockManager}, which oversees
    * the lifecycle of multiple such locks.
    */
-  public class PoolBasedHierarchicalResourceLock implements 
HierarchicalResourceLock, Closeable {
+  private final class PoolBasedHierarchicalResourceLock implements 
HierarchicalResourceLock, Closeable {
 
     private boolean isLockAcquired;
     private final Lock lock;
     private final DAGLeveledResource resource;
     private final String key;
 
-    public PoolBasedHierarchicalResourceLock(DAGLeveledResource resource, 
String key, Lock lock) {
+    private PoolBasedHierarchicalResourceLock(DAGLeveledResource resource, 
String key, Lock lock) {
       this.isLockAcquired = true;
       this.lock = lock;
       this.resource = resource;
       this.key = key;
       this.lock.lock();
+      resourceLockTracker.lockResource(resource);
     }
 
     @Override
@@ -157,6 +184,7 @@ public boolean isLockAcquired() {
     public synchronized void close() throws IOException {
       if (isLockAcquired) {
         this.lock.unlock();
+        resourceLockTracker.unlockResource(resource);
         operateOnLock(resource, key, (LockReferenceCountPair::decrement));
         isLockAcquired = false;
       }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
index 9458f575503..df81a9ba4d3 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.om.lock;
 
 import java.io.IOException;
+import java.util.stream.Stream;
 
 /**
  * A read only lock manager that does not acquire any lock.
@@ -57,6 +58,11 @@ public HierarchicalResourceLock 
acquireWriteLock(DAGLeveledResource resource, St
     return EMPTY_LOCK_NOT_ACQUIRED;
   }
 
+  @Override
+  public Stream<DAGLeveledResource> getCurrentLockedResources() {
+    return Stream.empty();
+  }
+
   @Override
   public void close() throws Exception {
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ResourceLockTracker.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ResourceLockTracker.java
new file mode 100644
index 00000000000..8a551e5f06d
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ResourceLockTracker.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.util.stream.Stream;
+
+/**
+ * Abstract class that tracks resource locks based on a generic resource type.
+ * Provides methods to acquire, release, and inspect locks, as well as obtain
+ * details about the locks held by the current thread.
+ *
+ * @param <T> the type of resource being tracked, which must implement
+ *            {@link IOzoneManagerLock.Resource}.
+ */
+abstract class ResourceLockTracker<T extends IOzoneManagerLock.Resource> {
+
+  private final ThreadLocal<OMLockDetails> omLockDetails = 
ThreadLocal.withInitial(OMLockDetails::new);
+
+  abstract boolean canLockResource(T resource);
+
+  abstract Stream<T> getCurrentLockedResources();
+
+  OMLockDetails clearLockDetails() {
+    omLockDetails.get().clear();
+    return getOmLockDetails();
+  }
+
+  OMLockDetails unlockResource(T resource) {
+    return getOmLockDetails();
+  }
+
+  OMLockDetails lockResource(T resource) {
+    omLockDetails.get().setLockAcquired(true);
+    return getOmLockDetails();
+  }
+
+  public OMLockDetails getOmLockDetails() {
+    return omLockDetails.get();
+  }
+}
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
index 9ced7f3f4d9..bf2393f645b 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
@@ -19,6 +19,11 @@
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT;
+import static 
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.BOOTSTRAP_LOCK;
+import static 
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_CONTENT_LOCK;
+import static 
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_LOCK;
+import static 
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_GC_LOCK;
+import static 
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_LOCAL_DATA_LOCK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -26,9 +31,15 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -39,6 +50,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
 import org.junit.jupiter.api.AfterEach;
@@ -46,6 +58,7 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 /**
@@ -251,27 +264,6 @@ public void testDoubleClose() throws Exception {
     assertFalse(lock.isLockAcquired());
   }
 
-  /**
-   * Test different resource types can be locked independently.
-   */
-  @Test
-  public void testDifferentResourceTypes() throws Exception {
-
-    List<HierarchicalResourceLock> locks = new ArrayList<>();
-    for (DAGLeveledResource otherResource : DAGLeveledResource.values()) {
-      String key = "test-key";
-      locks.add(lockManager.acquireWriteLock(otherResource, key));
-    }
-    for (HierarchicalResourceLock lock : locks) {
-      assertNotNull(lock);
-      assertTrue(lock.isLockAcquired());
-    }
-    for (HierarchicalResourceLock lock : locks) {
-      lock.close();
-    }
-  }
-
-
   /**
    * Test different keys on same resource type can be locked concurrently.
    */
@@ -580,4 +572,39 @@ public void testIOExceptionPropagation() {
       assertNotNull(e.getMessage());
     }
   }
+
+  @ParameterizedTest
+  @EnumSource
+  public void testDAGLockOrderAcquisition(DAGLeveledResource 
dagLeveledResource) throws IOException {
+    Map<DAGLeveledResource, Set<IOzoneManagerLock.Resource>> 
forbiddenLockOrdering =
+        ImmutableMap.of(SNAPSHOT_DB_CONTENT_LOCK, 
ImmutableSet.of(SNAPSHOT_DB_LOCK, SNAPSHOT_LOCAL_DATA_LOCK),
+            BOOTSTRAP_LOCK, ImmutableSet.of(SNAPSHOT_GC_LOCK, 
SNAPSHOT_DB_LOCK, SNAPSHOT_DB_CONTENT_LOCK,
+                SNAPSHOT_LOCAL_DATA_LOCK));
+    List<DAGLeveledResource> resources = 
Arrays.stream(DAGLeveledResource.values()).collect(Collectors.toList());
+    for (DAGLeveledResource otherResource : resources) {
+      String otherResourceName1 = otherResource.getName() + "key";
+      String otherResourceName2 = otherResource.getName() + "key";
+      String dagResourceName = dagLeveledResource.getName() + "key";
+      try (HierarchicalResourceLock lock1 = 
lockManager.acquireWriteLock(otherResource, otherResourceName1);
+           HierarchicalResourceLock lock2 = 
lockManager.acquireWriteLock(otherResource, otherResourceName2)) {
+        assertTrue(lock1.isLockAcquired());
+        assertTrue(lock2.isLockAcquired());
+        if (forbiddenLockOrdering.getOrDefault(dagLeveledResource, 
Collections.emptySet()).contains(otherResource)) {
+          assertThrows(RuntimeException.class,
+              () -> lockManager.acquireWriteLock(dagLeveledResource, 
dagResourceName),
+              "Lock acquisition of " + dagLeveledResource + " should fail when 
" + otherResource +
+                  " is already acquired");
+          lock1.close();
+          assertThrows(RuntimeException.class,
+              () -> lockManager.acquireWriteLock(dagLeveledResource, 
dagResourceName),
+              "Lock acquisition of " + dagLeveledResource + " should fail when 
" + otherResource +
+                  " is already acquired even after first lock is released 
since second lock is still held");
+        } else {
+          try (HierarchicalResourceLock lock3 = 
lockManager.acquireWriteLock(dagLeveledResource, dagResourceName)) {
+            assertTrue(lock3.isLockAcquired());
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to