This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f66c87e2ec2 HDDS-13984. Reuse ResourceLockTracker for
HierarchicalResourceLockManager (#9352)
f66c87e2ec2 is described below
commit f66c87e2ec29728e5ac8eadcc385d49afdb56a10
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Tue Nov 25 18:11:41 2025 -0500
HDDS-13984. Reuse ResourceLockTracker for HierarchicalResourceLockManager
(#9352)
---
.../ozone/om/lock/DAGResourceLockTracker.java | 169 +++++++++++++++++
.../om/lock/HierarchicalResourceLockManager.java | 8 +
.../ozone/om/lock/LeveledResourceLockTracker.java | 94 ++++++++++
.../hadoop/ozone/om/lock/OzoneManagerLock.java | 205 +++------------------
.../PoolBasedHierarchicalResourceLockManager.java | 32 +++-
.../ReadOnlyHierarchicalResourceLockManager.java | 6 +
.../hadoop/ozone/om/lock/ResourceLockTracker.java | 55 ++++++
...stPoolBasedHierarchicalResourceLockManager.java | 69 ++++---
8 files changed, 438 insertions(+), 200 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/DAGResourceLockTracker.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/DAGResourceLockTracker.java
new file mode 100644
index 00000000000..7fd44059dd6
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/DAGResourceLockTracker.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+import java.util.stream.Stream;
+
+/**
+ * Specialized implementation of {@link ResourceLockTracker} to manage locks
for
+ * {@link DAGLeveledResource} types. This class enforces hierarchical locking
+ * constraints based on a directed acyclic graph (DAG) structure, ensuring that
+ * resources are locked and unlocked in a consistent manner.
+ *
+ * This tracker maintains an adjacency set representing resource dependencies,
wherein
+ * each resource's children in the DAG define locking constraints. The class
+ * populates these constraints at initialization and checks them dynamically
when locks
+ * are requested.
+ *
+ * The class is implemented as a singleton to ensure a central mechanism for
+ * managing resource locks across threads. It uses ThreadLocal storage to
track the
+ * number of locks held for each resource by the current thread.
+ *
+ * Key Features:
+ * 1. Hierarchical Locking: Resources are locked in the order defined by their
DAG
+ * structure. Violating dependencies will result in denial of the lock
request.
+ * 2. Thread-safe: The class guarantees thread safety with synchronized
initialization
+ * and thread-local management of lock counts.
+ * 3. Deadlock Avoidance: Lock dependency constraints prevent circular
dependencies
+ * in resource locking.
+ * 4. Centralized Tracking: Tracks the current locks held across all
DAGLeveledResource
+ * instances and provides utilities to inspect current lock states.
+ *
+ * Thread safety: This class is thread-safe.
+ */
+final class DAGResourceLockTracker extends
ResourceLockTracker<DAGLeveledResource> {
+
+ private final EnumMap<DAGLeveledResource, ThreadLocal<Integer>>
acquiredLocksMap =
+ new EnumMap<>(DAGLeveledResource.class);
+ private final Map<DAGLeveledResource, Set<DAGLeveledResource>>
lockDependentAdjacencySet =
+ new EnumMap<>(DAGLeveledResource.class);
+
+ private static volatile DAGResourceLockTracker instance = null;
+
+ public static DAGResourceLockTracker get() {
+ if (instance == null) {
+ synchronized (DAGResourceLockTracker.class) {
+ if (instance == null) {
+ instance = new DAGResourceLockTracker();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * Performs a Depth-First Search (DFS) traversal on a directed acyclic graph
(DAG)
+ * composed of {@code DAGLeveledResource} objects. This method populates a
mapping
+ * of resource dependencies while traversing the graph.
+ *
+ * The traversal stops for any resource already visited, avoiding potential
cycles
+ * (though cycles should not exist in a valid DAG). Throws an exception if
the provided
+ * stack is not empty at the start of the traversal.
+ *
+ * @param resource The starting resource for the DFS traversal.
+ * @param stack A stack used to manage the traversal process. The stack
should be empty
+ * when passed in.
+ * @param visited A set of resources that have already been visited during
the traversal
+ * to ensure each resource is processed only once.
+ */
+ private void dfs(DAGLeveledResource resource, Stack<DAGLeveledResource>
stack, Set<DAGLeveledResource> visited) {
+ if (visited.contains(resource)) {
+ return;
+ }
+ if (!stack.isEmpty()) {
+ throw new IllegalStateException("Stack is not empty while beginning to
traverse the DAG for resource :"
+ + resource);
+ }
+ stack.push(resource);
+ while (!stack.isEmpty()) {
+ DAGLeveledResource current = stack.peek();
+ if (!visited.contains(current)) {
+ visited.add(current);
+ for (DAGLeveledResource child : current.getChildren()) {
+ stack.push(child);
+ }
+ } else {
+ if (!lockDependentAdjacencySet.containsKey(current)) {
+ Set<DAGLeveledResource> adjacentResources = null;
+ for (DAGLeveledResource child : current.getChildren()) {
+ if (adjacentResources == null) {
+ adjacentResources = new HashSet<>();
+ }
+ adjacentResources.add(child);
+ adjacentResources.addAll(lockDependentAdjacencySet.get(child));
+ }
+ lockDependentAdjacencySet.put(current,
+ adjacentResources == null ? Collections.emptySet() :
adjacentResources);
+ }
+ stack.pop();
+ }
+ }
+ }
+
+ private void populateLockDependentAdjacencySet() {
+ Set<DAGLeveledResource> visited = new HashSet<>();
+ Stack<DAGLeveledResource> stack = new Stack<>();
+ for (DAGLeveledResource resource : DAGLeveledResource.values()) {
+ dfs(resource, stack, visited);
+ }
+ }
+
+ private DAGResourceLockTracker() {
+ populateLockDependentAdjacencySet();
+ for (DAGLeveledResource dagLeveledResource : DAGLeveledResource.values()) {
+ acquiredLocksMap.put(dagLeveledResource, ThreadLocal.withInitial(() ->
0));
+ }
+ }
+
+ private void updateAcquiredLockCount(DAGLeveledResource resource, int delta)
{
+ acquiredLocksMap.get(resource).set(acquiredLocksMap.get(resource).get() +
delta);
+ }
+
+ @Override
+ OMLockDetails lockResource(DAGLeveledResource resource) {
+ updateAcquiredLockCount(resource, 1);
+ return super.lockResource(resource);
+ }
+
+ @Override
+ OMLockDetails unlockResource(DAGLeveledResource resource) {
+ updateAcquiredLockCount(resource, -1);
+ return super.unlockResource(resource);
+ }
+
+ @Override
+ public boolean canLockResource(DAGLeveledResource resource) {
+ for (DAGLeveledResource child : lockDependentAdjacencySet.get(resource)) {
+ if (acquiredLocksMap.get(child).get() > 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ Stream<DAGLeveledResource> getCurrentLockedResources() {
+ return acquiredLocksMap.keySet().stream().filter(dagLeveled ->
acquiredLocksMap.get(dagLeveled).get() > 0);
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
index 7a1ae7c956d..898a387a9ac 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java
@@ -19,6 +19,7 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.stream.Stream;
/**
* Interface for Hierachical Resource Lock where the lock order acquired on
resource is going to be deterministic and
@@ -47,6 +48,13 @@ public interface HierarchicalResourceLockManager extends
AutoCloseable {
*/
HierarchicalResourceLock acquireWriteLock(DAGLeveledResource resource,
String key) throws IOException;
+ /**
+ * Retrieves a stream of all currently locked resources in the system by the
thread calling this method.
+ *
+ * @return a stream of {@code DAGLeveledResource} representing the resources
that are currently locked
+ */
+ Stream<DAGLeveledResource> getCurrentLockedResources();
+
/**
* Interface for managing the lock lifecycle corresponding to a Hierarchical
Resource.
*/
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/LeveledResourceLockTracker.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/LeveledResourceLockTracker.java
new file mode 100644
index 00000000000..bbe9cd9076c
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/LeveledResourceLockTracker.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+/**
+ * The LeveledResourceLockTracker class is a singleton that extends the
+ * {@link ResourceLockTracker} to manage locks on leveled resources defined
+ * by {@link OzoneManagerLock.LeveledResource}. This class provides
functionality
+ * to handle lock operations for resources that are managed at different
hierarchical
+ * levels, utilizing a {@link ThreadLocal} mechanism for tracking the current
lock set
+ * for each thread.
+ *
+ * The class ensures thread-safety by restricting lock access to the current
thread's
+ * context. It allows checking if a resource can be locked, locking a
resource, unlocking
+ * a resource, and retrieving details of currently locked resources.
+ *
+ * Thread safety is achieved through the use of a volatile singleton instance
and
+ * double-checked locking during initialization. The internal lock state is
maintained
+ * per-thread using a {@link ThreadLocal} variable.
+ *
+ * Key methods include:
+ * - {@code get()}: Retrieves the singleton instance of the
LeveledResourceLockTracker.
+ * - {@code canLockResource(OzoneManagerLock.LeveledResource resource)}:
Determines
+ * if a specific resource can be locked based on the current lock set.
+ * - {@code getCurrentLockedResources()}: Provides a stream of currently
locked
+ * resources for the current thread.
+ * - {@code lockResource(OzoneManagerLock.LeveledResource resource)}: Locks a
specified
+ * resource and updates the current thread's lock state.
+ * - {@code unlockResource(OzoneManagerLock.LeveledResource resource)}:
Unlocks a specified
+ * resource and updates the current thread's lock state accordingly.
+ *
+ * This class is final to prevent subclassing, ensuring the integrity of its
+ * singleton behavior and thread-local lock tracking mechanism.
+ */
+final class LeveledResourceLockTracker extends
ResourceLockTracker<OzoneManagerLock.LeveledResource> {
+ private final ThreadLocal<Short> lockSet = ThreadLocal.withInitial(() ->
Short.valueOf((short) 0));
+ private static volatile LeveledResourceLockTracker instance = null;
+
+ private LeveledResourceLockTracker() {
+ }
+
+ public static LeveledResourceLockTracker get() {
+ if (instance == null) {
+ synchronized (LeveledResourceLockTracker.class) {
+ if (instance == null) {
+ instance = new LeveledResourceLockTracker();
+ }
+ }
+ }
+ return instance;
+ }
+
+ @Override
+ public boolean canLockResource(OzoneManagerLock.LeveledResource resource) {
+ return resource.canLock(lockSet.get());
+ }
+
+ @Override
+ Stream<OzoneManagerLock.LeveledResource> getCurrentLockedResources() {
+ short lockSetVal = lockSet.get();
+ return Arrays.stream(OzoneManagerLock.LeveledResource.values())
+ .filter(leveledResource -> leveledResource.isLevelLocked(lockSetVal));
+ }
+
+ @Override
+ public OMLockDetails unlockResource(OzoneManagerLock.LeveledResource
resource) {
+ lockSet.set(resource.clearLock(lockSet.get()));
+ return super.unlockResource(resource);
+ }
+
+ @Override
+ public OMLockDetails lockResource(OzoneManagerLock.LeveledResource resource)
{
+ lockSet.set(resource.setLock(lockSet.get()));
+ return super.lockResource(resource);
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 351b3af25eb..79c1fa6a84b 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -31,19 +31,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
-import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -102,7 +98,7 @@ public class OzoneManagerLock implements IOzoneManagerLock {
LoggerFactory.getLogger(OzoneManagerLock.class);
private final Map<Class<? extends Resource>,
- Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>>
resourcelockMap;
+ Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>>
resourcelockMap;
private OMLockMetrics omLockMetrics;
@@ -116,22 +112,22 @@ public OzoneManagerLock(ConfigurationSource conf) {
getFlatLocks(conf));
}
- private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
getLeveledLocks(
+ private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>
getLeveledLocks(
ConfigurationSource conf) {
Map<LeveledResource, Striped<ReadWriteLock>> stripedLockMap = new
EnumMap<>(LeveledResource.class);
for (LeveledResource r : LeveledResource.values()) {
stripedLockMap.put(r, createStripeLock(r, conf));
}
- return Pair.of(Collections.unmodifiableMap(stripedLockMap), new
LeveledResourceLockManager());
+ return Pair.of(Collections.unmodifiableMap(stripedLockMap),
LeveledResourceLockTracker.get());
}
- private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
getFlatLocks(
+ private Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>
getFlatLocks(
ConfigurationSource conf) {
Map<DAGLeveledResource, Striped<ReadWriteLock>> stripedLockMap = new
EnumMap<>(DAGLeveledResource.class);
for (DAGLeveledResource r : DAGLeveledResource.values()) {
stripedLockMap.put(r, createStripeLock(r, conf));
}
- return Pair.of(Collections.unmodifiableMap(stripedLockMap), new
DAGResourceLockManager());
+ return Pair.of(Collections.unmodifiableMap(stripedLockMap),
DAGResourceLockTracker.get());
}
private Striped<ReadWriteLock> createStripeLock(Resource r,
@@ -277,11 +273,11 @@ private void acquireLock(Resource resource, boolean
isReadLock, ReadWriteLock lo
private OMLockDetails acquireLocks(Resource resource, boolean isReadLock,
Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>>
lockListProvider) {
- Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
resourceLockPair =
+ Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>
resourceLockPair =
resourcelockMap.get(resource.getClass());
- ResourceLockManager<Resource> resourceLockManager =
resourceLockPair.getRight();
- resourceLockManager.clearLockDetails();
- if (!resourceLockManager.canLockResource(resource)) {
+ ResourceLockTracker<Resource> resourceLockTracker =
resourceLockPair.getRight();
+ resourceLockTracker.clearLockDetails();
+ if (!resourceLockTracker.canLockResource(resource)) {
String errorMessage = getErrorMessage(resource);
LOG.error(errorMessage);
throw new RuntimeException(errorMessage);
@@ -292,15 +288,15 @@ private OMLockDetails acquireLocks(Resource resource,
boolean isReadLock,
for (ReadWriteLock lock :
lockListProvider.apply(resourceLockPair.getKey().get(resource))) {
acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
}
- return resourceLockManager.lockResource(resource);
+ return resourceLockTracker.lockResource(resource);
}
private OMLockDetails acquireLock(Resource resource, boolean isReadLock,
String... keys) {
- Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
resourceLockPair =
+ Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>
resourceLockPair =
resourcelockMap.get(resource.getClass());
- ResourceLockManager<Resource> resourceLockManager =
resourceLockPair.getRight();
- resourceLockManager.clearLockDetails();
- if (!resourceLockManager.canLockResource(resource)) {
+ ResourceLockTracker<Resource> resourceLockTracker =
resourceLockPair.getRight();
+ resourceLockTracker.clearLockDetails();
+ if (!resourceLockTracker.canLockResource(resource)) {
String errorMessage = getErrorMessage(resource);
LOG.error(errorMessage);
throw new RuntimeException(errorMessage);
@@ -310,7 +306,7 @@ private OMLockDetails acquireLock(Resource resource,
boolean isReadLock, String.
ReentrantReadWriteLock lock = getLock(resourceLockPair.getKey(), resource,
keys);
acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
- return resourceLockManager.lockResource(resource);
+ return resourceLockTracker.lockResource(resource);
}
private void updateReadLockMetrics(Resource resource,
@@ -365,7 +361,7 @@ private String getErrorMessage(Resource resource) {
@VisibleForTesting
List<String> getCurrentLocks() {
return resourcelockMap.values().stream().map(Pair::getValue)
- .flatMap(rlm -> ((ResourceLockManager<? extends
Resource>)rlm).getCurrentLockedResources())
+ .flatMap(rlm -> ((ResourceLockTracker<? extends
Resource>)rlm).getCurrentLockedResources())
.map(Resource::getName)
.collect(Collectors.toList());
}
@@ -455,10 +451,10 @@ public OMLockDetails releaseReadLocks(Resource resource,
Collection<String[]> ke
private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
String... keys) {
- Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
resourceLockPair =
+ Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>
resourceLockPair =
resourcelockMap.get(resource.getClass());
- ResourceLockManager<Resource> resourceLockManager =
resourceLockPair.getRight();
- resourceLockManager.clearLockDetails();
+ ResourceLockTracker<Resource> resourceLockTracker =
resourceLockPair.getRight();
+ resourceLockTracker.clearLockDetails();
ReentrantReadWriteLock lock = getLock(resourceLockPair.getKey(), resource,
keys);
if (isReadLock) {
lock.readLock().unlock();
@@ -468,15 +464,15 @@ private OMLockDetails releaseLock(Resource resource,
boolean isReadLock,
lock.writeLock().unlock();
updateWriteUnlockMetrics(resource, lock, isWriteLocked);
}
- return resourceLockManager.unlockResource(resource);
+ return resourceLockTracker.unlockResource(resource);
}
private OMLockDetails releaseLocks(Resource resource, boolean isReadLock,
Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>>
lockListProvider) {
- Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager>
resourceLockPair =
+ Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockTracker>
resourceLockPair =
resourcelockMap.get(resource.getClass());
- ResourceLockManager<Resource> resourceLockManager =
resourceLockPair.getRight();
- resourceLockManager.clearLockDetails();
+ ResourceLockTracker<Resource> resourceLockTracker =
resourceLockPair.getRight();
+ resourceLockTracker.clearLockDetails();
List<ReadWriteLock> locks =
StreamSupport.stream(lockListProvider.apply(resourceLockPair.getKey().get(resource))
.spliterator(), false).collect(Collectors.toList());
// Release locks in reverse order.
@@ -491,7 +487,7 @@ private OMLockDetails releaseLocks(Resource resource,
boolean isReadLock,
updateWriteUnlockMetrics(resource, (ReentrantReadWriteLock) lock,
isWriteLocked);
}
}
- return resourceLockManager.unlockResource(resource);
+ return resourceLockTracker.unlockResource(resource);
}
private void updateReadUnlockMetrics(Resource resource,
@@ -581,151 +577,6 @@ public OMLockMetrics getOMLockMetrics() {
return omLockMetrics;
}
- private abstract static class ResourceLockManager<T extends Resource> {
-
- private final ThreadLocal<OMLockDetails> omLockDetails =
ThreadLocal.withInitial(OMLockDetails::new);
-
- abstract boolean canLockResource(T resource);
-
- abstract Stream<T> getCurrentLockedResources();
-
- OMLockDetails clearLockDetails() {
- omLockDetails.get().clear();
- return omLockDetails.get();
- }
-
- OMLockDetails unlockResource(T resource) {
- return omLockDetails.get();
- }
-
- OMLockDetails lockResource(T resource) {
- omLockDetails.get().setLockAcquired(true);
- return omLockDetails.get();
- }
- }
-
- private static final class DAGResourceLockManager extends
ResourceLockManager<DAGLeveledResource> {
-
- private final EnumMap<DAGLeveledResource, ThreadLocal<Boolean>>
acquiredLocksMap =
- new EnumMap<>(DAGLeveledResource.class);
- private final Map<DAGLeveledResource, Set<DAGLeveledResource>>
lockDependentAdjacencySet =
- new EnumMap<>(DAGLeveledResource.class);
-
- /**
- * Performs a depth-first traversal (DFS) on the directed acyclic graph
(DAG)
- * of {@link DAGLeveledResource} objects, processing dependencies and
updating
- * the lock-dependent adjacency set with resource relationships.
- *
- * @param resource the DAGLeveledResource instance to start the traversal
from
- * @param stack a stack to manage the DFS traversal state
- * @param visited a set to keep track of resources that have already been
visited
- * during the traversal
- */
- private void dfs(DAGLeveledResource resource, Stack<DAGLeveledResource>
stack, Set<DAGLeveledResource> visited) {
- if (visited.contains(resource)) {
- return;
- }
- if (!stack.isEmpty()) {
- throw new IllegalStateException("Stack is not empty while beginning to
traverse the DAG for resource :"
- + resource);
- }
- stack.push(resource);
- while (!stack.isEmpty()) {
- DAGLeveledResource current = stack.peek();
- if (!visited.contains(current)) {
- visited.add(current);
- for (DAGLeveledResource child : current.getChildren()) {
- stack.push(child);
- }
- } else {
- if (!lockDependentAdjacencySet.containsKey(current)) {
- Set<DAGLeveledResource> adjacentResources = null;
- for (DAGLeveledResource child : current.getChildren()) {
- if (adjacentResources == null) {
- adjacentResources = new HashSet<>();
- }
- adjacentResources.add(child);
- adjacentResources.addAll(lockDependentAdjacencySet.get(child));
- }
- lockDependentAdjacencySet.put(current,
- adjacentResources == null ? Collections.emptySet() :
adjacentResources);
- }
- stack.pop();
- }
- }
- }
-
- private void populateLockDependentAdjacencySet() {
- Set<DAGLeveledResource> visited = new HashSet<>();
- Stack<DAGLeveledResource> stack = new Stack<>();
- for (DAGLeveledResource resource : DAGLeveledResource.values()) {
- dfs(resource, stack, visited);
- }
- }
-
- private DAGResourceLockManager() {
- populateLockDependentAdjacencySet();
- for (DAGLeveledResource dagLeveledResource :
DAGLeveledResource.values()) {
- acquiredLocksMap.put(dagLeveledResource, ThreadLocal.withInitial(() ->
Boolean.FALSE));
- }
- }
-
- @Override
- OMLockDetails lockResource(DAGLeveledResource resource) {
- acquiredLocksMap.get(resource).set(Boolean.TRUE);
- return super.lockResource(resource);
- }
-
- @Override
- OMLockDetails unlockResource(DAGLeveledResource resource) {
- acquiredLocksMap.get(resource).set(Boolean.FALSE);
- return super.unlockResource(resource);
- }
-
- @Override
- public boolean canLockResource(DAGLeveledResource resource) {
- for (DAGLeveledResource child : lockDependentAdjacencySet.get(resource))
{
- if (acquiredLocksMap.get(child).get()) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- Stream<DAGLeveledResource> getCurrentLockedResources() {
- return acquiredLocksMap.keySet().stream().filter(dagLeveled ->
acquiredLocksMap.get(dagLeveled).get());
- }
- }
-
- private static final class LeveledResourceLockManager extends
ResourceLockManager<LeveledResource> {
- private final ThreadLocal<Short> lockSet = ThreadLocal.withInitial(() ->
Short.valueOf((short)0));
-
- @Override
- public boolean canLockResource(LeveledResource resource) {
- return resource.canLock(lockSet.get());
- }
-
- @Override
- Stream<LeveledResource> getCurrentLockedResources() {
- short lockSetVal = lockSet.get();
- return Arrays.stream(LeveledResource.values())
- .filter(leveledResource ->
leveledResource.isLevelLocked(lockSetVal));
- }
-
- @Override
- public OMLockDetails unlockResource(LeveledResource resource) {
- lockSet.set(resource.clearLock(lockSet.get()));
- return super.unlockResource(resource);
- }
-
- @Override
- public OMLockDetails lockResource(LeveledResource resource) {
- lockSet.set(resource.setLock(lockSet.get()));
- return super.lockResource(resource);
- }
- }
-
/**
* Leveled Resource defined in Ozone.
* Enforces lock acquisition ordering based on the resource level. A
resource at lower level cannot be acquired
@@ -849,7 +700,7 @@ short getMask() {
* @param type IPC Timing types
* @param deltaNanos consumed time
*/
- private void updateProcessingDetails(ResourceLockManager<? extends Resource>
resourceLockManager, Timing type,
+ private void updateProcessingDetails(ResourceLockTracker<? extends Resource>
resourceLockTracker, Timing type,
long deltaNanos) {
Server.Call call = Server.getCurCall().get();
if (call != null) {
@@ -857,13 +708,13 @@ private void
updateProcessingDetails(ResourceLockManager<? extends Resource> res
} else {
switch (type) {
case LOCKWAIT:
- resourceLockManager.omLockDetails.get().add(deltaNanos,
OMLockDetails.LockOpType.WAIT);
+ resourceLockTracker.getOmLockDetails().add(deltaNanos,
OMLockDetails.LockOpType.WAIT);
break;
case LOCKSHARED:
- resourceLockManager.omLockDetails.get().add(deltaNanos,
OMLockDetails.LockOpType.READ);
+ resourceLockTracker.getOmLockDetails().add(deltaNanos,
OMLockDetails.LockOpType.READ);
break;
case LOCKEXCLUSIVE:
- resourceLockManager.omLockDetails.get().add(deltaNanos,
OMLockDetails.LockOpType.WRITE);
+ resourceLockTracker.getOmLockDetails().add(deltaNanos,
OMLockDetails.LockOpType.WRITE);
break;
default:
LOG.error("Unsupported Timing type {}", type);
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
index ac94c57c31b..aa28a9d4df4 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java
@@ -32,12 +32,16 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A lock manager implementation that manages hierarchical resource locks
@@ -47,7 +51,11 @@
* DAGs (e.g., File System trees or snapshot chains).
*/
public class PoolBasedHierarchicalResourceLockManager implements
HierarchicalResourceLockManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PoolBasedHierarchicalResourceLockManager.class);
+
private final GenericObjectPool<ReadWriteLock> lockPool;
+ private final ResourceLockTracker<DAGLeveledResource> resourceLockTracker;
private final Map<DAGLeveledResource, Map<String, LockReferenceCountPair>>
lockMap;
public PoolBasedHierarchicalResourceLockManager(OzoneConfiguration conf) {
@@ -61,6 +69,7 @@ public
PoolBasedHierarchicalResourceLockManager(OzoneConfiguration conf) {
config.setBlockWhenExhausted(true);
this.lockPool = new GenericObjectPool<>(new ReadWriteLockFactory(),
config);
this.lockMap = new ConcurrentHashMap<>();
+ this.resourceLockTracker = DAGResourceLockTracker.get();
}
private ReadWriteLock operateOnLock(DAGLeveledResource resource, String key,
@@ -102,8 +111,25 @@ public HierarchicalResourceLock
acquireWriteLock(DAGLeveledResource resource, St
return acquireLock(resource, key, false);
}
+ private String getErrorMessage(IOzoneManagerLock.Resource resource) {
+ return "Thread '" + Thread.currentThread().getName() + "' cannot " +
+ "acquire " + resource.getName() + " lock while holding " +
+
resourceLockTracker.getCurrentLockedResources().map(IOzoneManagerLock.Resource::getName)
+ .collect(Collectors.toList()) + " lock(s).";
+ }
+
+ @Override
+ public Stream<DAGLeveledResource> getCurrentLockedResources() {
+ return resourceLockTracker.getCurrentLockedResources();
+ }
+
private HierarchicalResourceLock acquireLock(DAGLeveledResource resource,
String key, boolean isReadLock)
throws IOException {
+ if (!resourceLockTracker.canLockResource(resource)) {
+ String errorMessage = getErrorMessage(resource);
+ LOG.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
ReadWriteLock readWriteLock = operateOnLock(resource, key,
LockReferenceCountPair::increment);
if (readWriteLock == null) {
throw new IOException("Unable to acquire " + (isReadLock ? "read" :
"write") + " lock on resource "
@@ -133,19 +159,20 @@ public void close() {
* class, {@code PoolBasedHierarchicalResourceLockManager}, which oversees
* the lifecycle of multiple such locks.
*/
- public class PoolBasedHierarchicalResourceLock implements
HierarchicalResourceLock, Closeable {
+ private final class PoolBasedHierarchicalResourceLock implements
HierarchicalResourceLock, Closeable {
private boolean isLockAcquired;
private final Lock lock;
private final DAGLeveledResource resource;
private final String key;
- public PoolBasedHierarchicalResourceLock(DAGLeveledResource resource,
String key, Lock lock) {
+ private PoolBasedHierarchicalResourceLock(DAGLeveledResource resource,
String key, Lock lock) {
this.isLockAcquired = true;
this.lock = lock;
this.resource = resource;
this.key = key;
this.lock.lock();
+ resourceLockTracker.lockResource(resource);
}
@Override
@@ -157,6 +184,7 @@ public boolean isLockAcquired() {
public synchronized void close() throws IOException {
if (isLockAcquired) {
this.lock.unlock();
+ resourceLockTracker.unlockResource(resource);
operateOnLock(resource, key, (LockReferenceCountPair::decrement));
isLockAcquired = false;
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
index 9458f575503..df81a9ba4d3 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.lock;
import java.io.IOException;
+import java.util.stream.Stream;
/**
* A read only lock manager that does not acquire any lock.
@@ -57,6 +58,11 @@ public HierarchicalResourceLock
acquireWriteLock(DAGLeveledResource resource, St
return EMPTY_LOCK_NOT_ACQUIRED;
}
+ @Override
+ public Stream<DAGLeveledResource> getCurrentLockedResources() {
+ return Stream.empty();
+ }
+
@Override
public void close() throws Exception {
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ResourceLockTracker.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ResourceLockTracker.java
new file mode 100644
index 00000000000..8a551e5f06d
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ResourceLockTracker.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.util.stream.Stream;
+
+/**
+ * Abstract class that tracks resource locks based on a generic resource type.
+ * Provides methods to acquire, release, and inspect locks, as well as obtain
+ * details about the locks held by the current thread.
+ *
+ * @param <T> the type of resource being tracked, which must implement
+ * {@link IOzoneManagerLock.Resource}.
+ */
+abstract class ResourceLockTracker<T extends IOzoneManagerLock.Resource> {
+
+ private final ThreadLocal<OMLockDetails> omLockDetails =
ThreadLocal.withInitial(OMLockDetails::new);
+
+ abstract boolean canLockResource(T resource);
+
+ abstract Stream<T> getCurrentLockedResources();
+
+ OMLockDetails clearLockDetails() {
+ omLockDetails.get().clear();
+ return getOmLockDetails();
+ }
+
+ OMLockDetails unlockResource(T resource) {
+ return getOmLockDetails();
+ }
+
+ OMLockDetails lockResource(T resource) {
+ omLockDetails.get().setLockAcquired(true);
+ return getOmLockDetails();
+ }
+
+ public OMLockDetails getOmLockDetails() {
+ return omLockDetails.get();
+ }
+}
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
index 9ced7f3f4d9..bf2393f645b 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java
@@ -19,6 +19,11 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT;
+import static
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.BOOTSTRAP_LOCK;
+import static
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_CONTENT_LOCK;
+import static
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_LOCK;
+import static
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_GC_LOCK;
+import static
org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_LOCAL_DATA_LOCK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -26,9 +31,15 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -39,6 +50,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
import org.junit.jupiter.api.AfterEach;
@@ -46,6 +58,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
/**
@@ -251,27 +264,6 @@ public void testDoubleClose() throws Exception {
assertFalse(lock.isLockAcquired());
}
- /**
- * Test different resource types can be locked independently.
- */
- @Test
- public void testDifferentResourceTypes() throws Exception {
-
- List<HierarchicalResourceLock> locks = new ArrayList<>();
- for (DAGLeveledResource otherResource : DAGLeveledResource.values()) {
- String key = "test-key";
- locks.add(lockManager.acquireWriteLock(otherResource, key));
- }
- for (HierarchicalResourceLock lock : locks) {
- assertNotNull(lock);
- assertTrue(lock.isLockAcquired());
- }
- for (HierarchicalResourceLock lock : locks) {
- lock.close();
- }
- }
-
-
/**
* Test different keys on same resource type can be locked concurrently.
*/
@@ -580,4 +572,39 @@ public void testIOExceptionPropagation() {
assertNotNull(e.getMessage());
}
}
+
+ @ParameterizedTest
+ @EnumSource
+ public void testDAGLockOrderAcquisition(DAGLeveledResource
dagLeveledResource) throws IOException {
+ Map<DAGLeveledResource, Set<IOzoneManagerLock.Resource>>
forbiddenLockOrdering =
+ ImmutableMap.of(SNAPSHOT_DB_CONTENT_LOCK,
ImmutableSet.of(SNAPSHOT_DB_LOCK, SNAPSHOT_LOCAL_DATA_LOCK),
+ BOOTSTRAP_LOCK, ImmutableSet.of(SNAPSHOT_GC_LOCK,
SNAPSHOT_DB_LOCK, SNAPSHOT_DB_CONTENT_LOCK,
+ SNAPSHOT_LOCAL_DATA_LOCK));
+ List<DAGLeveledResource> resources =
Arrays.stream(DAGLeveledResource.values()).collect(Collectors.toList());
+ for (DAGLeveledResource otherResource : resources) {
+ String otherResourceName1 = otherResource.getName() + "key";
+ String otherResourceName2 = otherResource.getName() + "key";
+ String dagResourceName = dagLeveledResource.getName() + "key";
+ try (HierarchicalResourceLock lock1 =
lockManager.acquireWriteLock(otherResource, otherResourceName1);
+ HierarchicalResourceLock lock2 =
lockManager.acquireWriteLock(otherResource, otherResourceName2)) {
+ assertTrue(lock1.isLockAcquired());
+ assertTrue(lock2.isLockAcquired());
+ if (forbiddenLockOrdering.getOrDefault(dagLeveledResource,
Collections.emptySet()).contains(otherResource)) {
+ assertThrows(RuntimeException.class,
+ () -> lockManager.acquireWriteLock(dagLeveledResource,
dagResourceName),
+ "Lock acquisition of " + dagLeveledResource + " should fail when
" + otherResource +
+ " is already acquired");
+ lock1.close();
+ assertThrows(RuntimeException.class,
+ () -> lockManager.acquireWriteLock(dagLeveledResource,
dagResourceName),
+ "Lock acquisition of " + dagLeveledResource + " should fail when
" + otherResource +
+ " is already acquired even after first lock is released
since second lock is still held");
+ } else {
+ try (HierarchicalResourceLock lock3 =
lockManager.acquireWriteLock(dagLeveledResource, dagResourceName)) {
+ assertTrue(lock3.isLockAcquired());
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]