This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e936e4deb10 HDDS-12134. Implement Snapshot Cache lock for OM Bootstrap 
(#8474)
e936e4deb10 is described below

commit e936e4deb10c800bd53eef4a39d7634f9ab5ec9b
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Sun Jun 8 18:06:27 2025 -0400

    HDDS-12134. Implement Snapshot Cache lock for OM Bootstrap (#8474)
---
 .../hadoop/ozone/om/lock/IOzoneManagerLock.java    |   4 +
 .../hadoop/ozone/om/lock/OmReadOnlyLock.java       |  10 ++
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     |  53 ++++++++---
 .../hadoop/ozone/om/lock/TestOzoneManagerLock.java | 104 +++++++++++++++++++--
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   2 +-
 .../hadoop/ozone/om/SstFilteringService.java       |   8 +-
 .../response/snapshot/OMSnapshotPurgeResponse.java |   8 +-
 .../hadoop/ozone/om/snapshot/SnapshotCache.java    |  70 ++++++++++++--
 .../ozone/om/snapshot/TestSnapshotCache.java       |  61 +++++++++++-
 .../ozone/om/snapshot/TestSnapshotDiffManager.java |   4 +-
 10 files changed, 278 insertions(+), 46 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
index 808b9a4321a..7e8ed7c7817 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
@@ -36,6 +36,8 @@ OMLockDetails acquireWriteLock(Resource resource,
   OMLockDetails acquireWriteLocks(Resource resource,
                                  Collection<String[]> resources);
 
+  OMLockDetails acquireResourceWriteLock(Resource resource);
+
   boolean acquireMultiUserLock(String firstUser, String secondUser);
 
   void releaseMultiUserLock(String firstUser, String secondUser);
@@ -46,6 +48,8 @@ OMLockDetails releaseWriteLock(Resource resource,
   OMLockDetails releaseWriteLocks(Resource resource,
                                  Collection<String[]> resources);
 
+  OMLockDetails releaseResourceWriteLock(Resource resource);
+
   OMLockDetails releaseReadLock(Resource resource,
                                 String... resources);
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
index 0c289cb1889..faf5ca99b8c 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
@@ -50,6 +50,11 @@ public OMLockDetails acquireWriteLocks(Resource resource, 
Collection<String[]> r
     return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
   }
 
+  @Override
+  public OMLockDetails acquireResourceWriteLock(Resource resource) {
+    return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
+  }
+
   @Override
   public boolean acquireMultiUserLock(String firstUser, String secondUser) {
     return false;
@@ -71,6 +76,11 @@ public OMLockDetails releaseWriteLocks(Resource resource, 
Collection<String[]> r
     return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
   }
 
+  @Override
+  public OMLockDetails releaseResourceWriteLock(Resource resource) {
+    return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
+  }
+
   @Override
   public OMLockDetails releaseReadLock(Resource resource, String... resources) 
{
     return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 2a62836bed9..6cd96f73238 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -37,7 +37,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.commons.lang3.tuple.Pair;
@@ -140,9 +142,11 @@ private Striped<ReadWriteLock> createStripeLock(Resource r,
     return SimpleStriped.readWriteLock(size, fair);
   }
 
-  private Iterable<ReadWriteLock> bulkGetLock(Map<Resource, 
Striped<ReadWriteLock>> lockMap, Resource resource,
-      Collection<String[]> keys) {
-    Striped<ReadWriteLock> striped = lockMap.get(resource);
+  private Iterable<ReadWriteLock> getAllLocks(Striped<ReadWriteLock> striped) {
+    return IntStream.range(0, 
striped.size()).mapToObj(striped::getAt).collect(Collectors.toList());
+  }
+
+  private Iterable<ReadWriteLock> bulkGetLock(Striped<ReadWriteLock> striped, 
Collection<String[]> keys) {
     List<Object> lockKeys = new ArrayList<>(keys.size());
     for (String[] key : keys) {
       if (Objects.nonNull(key)) {
@@ -200,7 +204,7 @@ public OMLockDetails acquireReadLock(Resource resource, 
String... keys) {
    */
   @Override
   public OMLockDetails acquireReadLocks(Resource resource, 
Collection<String[]> keys) {
-    return acquireLocks(resource, true, keys);
+    return acquireLocks(resource, true, striped -> bulkGetLock(striped, keys));
   }
 
   /**
@@ -244,7 +248,17 @@ public OMLockDetails acquireWriteLock(Resource resource, 
String... keys) {
    */
   @Override
   public OMLockDetails acquireWriteLocks(Resource resource, 
Collection<String[]> keys) {
-    return acquireLocks(resource, false, keys);
+    return acquireLocks(resource, false, striped -> bulkGetLock(striped, 
keys));
+  }
+
+  /**
+   * Acquires all write locks for a specified resource.
+   *
+   * @param resource The resource for which the write lock is to be acquired.
+   */
+  @Override
+  public OMLockDetails acquireResourceWriteLock(Resource resource) {
+    return acquireLocks(resource, false, this::getAllLocks);
   }
 
   private void acquireLock(Resource resource, boolean isReadLock, 
ReadWriteLock lock,
@@ -258,7 +272,8 @@ private void acquireLock(Resource resource, boolean 
isReadLock, ReadWriteLock lo
     }
   }
 
-  private OMLockDetails acquireLocks(Resource resource, boolean isReadLock, 
Collection<String[]> keys) {
+  private OMLockDetails acquireLocks(Resource resource, boolean isReadLock,
+      Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>> 
lockListProvider) {
     Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
resourceLockPair =
         resourcelockMap.get(resource.getClass());
     ResourceLockManager<Resource> resourceLockManager = 
resourceLockPair.getRight();
@@ -271,7 +286,7 @@ private OMLockDetails acquireLocks(Resource resource, 
boolean isReadLock, Collec
 
     long startWaitingTimeNanos = Time.monotonicNowNanos();
 
-    for (ReadWriteLock lock : bulkGetLock(resourceLockPair.getKey(), resource, 
keys)) {
+    for (ReadWriteLock lock : 
lockListProvider.apply(resourceLockPair.getKey().get(resource))) {
       acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
     }
     return resourceLockManager.lockResource(resource);
@@ -342,7 +357,6 @@ private String getErrorMessage(Resource resource) {
     return "Thread '" + Thread.currentThread().getName() + "' cannot " +
         "acquire " + resource.getName() + " lock while holding " +
         getCurrentLocks().toString() + " lock(s).";
-
   }
 
   @VisibleForTesting
@@ -397,7 +411,17 @@ public OMLockDetails releaseWriteLock(Resource resource, 
String... keys) {
    */
   @Override
   public OMLockDetails releaseWriteLocks(Resource resource, 
Collection<String[]> keys) {
-    return releaseLocks(resource, false, keys);
+    return releaseLocks(resource, false, striped -> bulkGetLock(striped, 
keys));
+  }
+
+  /**
+   * Releases a write lock acquired on the entire Stripe for a specified 
resource.
+   *
+   * @param resource The resource for which the write lock is to be acquired.
+   */
+  @Override
+  public OMLockDetails releaseResourceWriteLock(Resource resource) {
+    return releaseLocks(resource, false, this::getAllLocks);
   }
 
   /**
@@ -423,7 +447,7 @@ public OMLockDetails releaseReadLock(Resource resource, 
String... keys) {
    */
   @Override
   public OMLockDetails releaseReadLocks(Resource resource, 
Collection<String[]> keys) {
-    return releaseLocks(resource, true, keys);
+    return releaseLocks(resource, true, striped -> bulkGetLock(striped, keys));
   }
 
   private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
@@ -445,12 +469,12 @@ private OMLockDetails releaseLock(Resource resource, 
boolean isReadLock,
   }
 
   private OMLockDetails releaseLocks(Resource resource, boolean isReadLock,
-      Collection<String[]> keys) {
+      Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>> 
lockListProvider) {
     Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> 
resourceLockPair =
         resourcelockMap.get(resource.getClass());
     ResourceLockManager<Resource> resourceLockManager = 
resourceLockPair.getRight();
     resourceLockManager.clearLockDetails();
-    List<ReadWriteLock> locks = 
StreamSupport.stream(bulkGetLock(resourceLockPair.getKey(), resource, keys)
+    List<ReadWriteLock> locks = 
StreamSupport.stream(lockListProvider.apply(resourceLockPair.getKey().get(resource))
             .spliterator(), false).collect(Collectors.toList());
     // Release locks in reverse order.
     Collections.reverse(locks);
@@ -558,7 +582,10 @@ public OMLockMetrics getOMLockMetrics() {
    * Flat Resource defined in Ozone. Locks can be acquired on a resource 
independent of one another.
    */
   public enum FlatResource implements Resource {
-    SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK");
+    // Background services lock on a Snapshot.
+    SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK"),
+    // Lock acquired on a Snapshot's RocksDB Handle.
+    SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK");
 
     private String name;
     private ResourceManager resourceManager;
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
index 500a96e29a4..a1d853eb6b3 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
@@ -31,6 +31,7 @@
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
@@ -39,7 +40,9 @@
 import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Class tests OzoneManagerLock.
@@ -278,34 +281,119 @@ void acquireUserLockAfterMultiUserLock() {
     lock.releaseMultiUserLock("user1", "user2");
   }
 
-  @Test
-  void testLockResourceParallel() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testLockResourceParallel(boolean fullResourceLock) throws Exception {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
 
-    for (LeveledResource resource :
-        LeveledResource.values()) {
+    for (Resource resource : Stream.of(LeveledResource.values(), 
FlatResource.values())
+        .flatMap(Arrays::stream).collect(Collectors.toList())) {
       final String[] resourceName = generateResourceName(resource);
-      lock.acquireWriteLock(resource, resourceName);
+      if (fullResourceLock) {
+        lock.acquireResourceWriteLock(resource);
+      } else {
+        lock.acquireWriteLock(resource, resourceName);
+      }
 
       AtomicBoolean gotLock = new AtomicBoolean(false);
       new Thread(() -> {
-        lock.acquireWriteLock(resource, resourceName);
+        if (fullResourceLock) {
+          lock.acquireResourceWriteLock(resource);
+        } else {
+          lock.acquireWriteLock(resource, resourceName);
+        }
         gotLock.set(true);
-        lock.releaseWriteLock(resource, resourceName);
+        if (fullResourceLock) {
+          lock.releaseResourceWriteLock(resource);
+        } else {
+          lock.releaseWriteLock(resource, resourceName);
+        }
+
       }).start();
       // Let's give some time for the new thread to run
       Thread.sleep(100);
       // Since the new thread is trying to get lock on same resource,
       // it will wait.
       assertFalse(gotLock.get());
-      lock.releaseWriteLock(resource, resourceName);
+      if (fullResourceLock) {
+        lock.releaseResourceWriteLock(resource);
+      } else {
+        lock.releaseWriteLock(resource, resourceName);
+      }
       // Since we have released the lock, the new thread should have the lock
       // now.
       // Let's give some time for the new thread to run
       Thread.sleep(100);
       assertTrue(gotLock.get());
     }
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = {
+      "true, true",
+      "true, false",
+      "false, true",
+      "false, false"
+  })
+  void testResourceLockFullResourceLockParallel(boolean 
mainThreadAcquireResourceLock, boolean acquireWriteLock)
+      throws Exception {
+    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
+
+    for (Resource resource : Stream.of(LeveledResource.values(), 
FlatResource.values())
+        .flatMap(Arrays::stream).collect(Collectors.toList())) {
+      final String[] resourceName = generateResourceName(resource);
+      if (mainThreadAcquireResourceLock) {
+        lock.acquireResourceWriteLock(resource);
+      } else {
+        if (acquireWriteLock) {
+          lock.acquireWriteLock(resource, resourceName);
+        } else {
+          lock.acquireReadLock(resource, resourceName);
+        }
+      }
 
+      AtomicBoolean gotLock = new AtomicBoolean(false);
+      new Thread(() -> {
+        if (!mainThreadAcquireResourceLock) {
+          lock.acquireResourceWriteLock(resource);
+        } else {
+          if (acquireWriteLock) {
+            lock.acquireWriteLock(resource, resourceName);
+          } else {
+            lock.acquireReadLock(resource, resourceName);
+          }
+        }
+        gotLock.set(true);
+        if (!mainThreadAcquireResourceLock) {
+          lock.releaseResourceWriteLock(resource);
+        } else {
+          if (acquireWriteLock) {
+            lock.releaseWriteLock(resource, resourceName);
+          } else {
+            lock.releaseReadLock(resource, resourceName);
+          }
+        }
+      }).start();
+      // Let's give some time for the new thread to run
+      Thread.sleep(100);
+      // Since the new thread is trying to get lock on same resource,
+      // it will wait.
+      assertFalse(gotLock.get());
+      if (mainThreadAcquireResourceLock) {
+        lock.releaseResourceWriteLock(resource);
+      } else {
+        if (acquireWriteLock) {
+          lock.releaseWriteLock(resource, resourceName);
+        } else {
+          lock.releaseReadLock(resource, resourceName);
+        }
+      }
+      // Since we have released the lock, the new thread should have the lock
+      // now.
+      // Let's give some time for the new thread to run
+      Thread.sleep(100);
+      assertTrue(gotLock.get());
+    }
   }
 
   @Test
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 262750e5c2f..1de509e9939 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -295,7 +295,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) {
         .getBoolean(OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES,
             OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT);
     this.snapshotCache = new SnapshotCache(loader, softCacheSize, 
ozoneManager.getMetrics(),
-        cacheCleanupServiceInterval, compactNonSnapshotDiffTables);
+        cacheCleanupServiceInterval, compactNonSnapshotDiffTables, 
ozoneManager.getMetadataManager().getLock());
 
     this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
         ozoneManager, snapDiffJobCf, snapDiffReportCf,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index ea46366d918..b94fd45bf7f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -19,7 +19,7 @@
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
-import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -135,8 +135,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo 
snapshotInfo) throws IO
       // in OmSnapshotPurgeResponse. Any operation apart from delete can run 
in parallel along with this operation.
       //TODO. Revisit other SNAPSHOT_LOCK and see if we can change write locks 
to read locks to further optimize it.
       OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock()
-          .acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), 
snapshotInfo.getBucketName(),
-              snapshotInfo.getName());
+          .acquireReadLock(SNAPSHOT_DB_LOCK, 
snapshotInfo.getSnapshotId().toString());
       boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
       if (acquiredSnapshotLock) {
         String snapshotDir = 
OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), 
snapshotInfo);
@@ -147,8 +146,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo 
snapshotInfo) throws IO
           }
         } finally {
           ozoneManager.getMetadataManager().getLock()
-              .releaseReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
-                  snapshotInfo.getBucketName(), snapshotInfo.getName());
+              .releaseReadLock(SNAPSHOT_DB_LOCK, 
snapshotInfo.getSnapshotId().toString());
         }
       }
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
index 5a530ee1188..2503b291c00 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.om.response.snapshot;
 
 import static 
org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE;
-import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
 
 import com.google.common.annotations.VisibleForTesting;
 import jakarta.annotation.Nonnull;
@@ -121,8 +121,7 @@ private void deleteCheckpointDirectory(OMMetadataManager 
omMetadataManager,
     // inside the snapshot directory. Any operation apart which doesn't 
create/delete files under this snapshot
     // directory can run in parallel along with this operation.
     OMLockDetails omLockDetails = omMetadataManager.getLock()
-        .acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), 
snapshotInfo.getBucketName(),
-            snapshotInfo.getName());
+        .acquireWriteLock(SNAPSHOT_DB_LOCK, 
snapshotInfo.getSnapshotId().toString());
     boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
     if (acquiredSnapshotLock) {
       Path snapshotDirPath = 
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
@@ -132,8 +131,7 @@ private void deleteCheckpointDirectory(OMMetadataManager 
omMetadataManager,
         LOG.error("Failed to delete snapshot directory {} for snapshot {}",
             snapshotDirPath, snapshotInfo.getTableKey(), ex);
       } finally {
-        omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, 
snapshotInfo.getVolumeName(),
-            snapshotInfo.getBucketName(), snapshotInfo.getName());
+        omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_DB_LOCK, 
snapshotInfo.getSnapshotId().toString());
       }
     }
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index 4bdca5d04ff..b465956f35e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.om.snapshot;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
 import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -28,12 +29,15 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import org.apache.hadoop.hdds.utils.Scheduler;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +62,7 @@ public class SnapshotCache implements 
ReferenceCountedCallback, AutoCloseable {
   private final int cacheSizeLimit;
   private final Set<UUID> pendingEvictionQueue;
   private final Scheduler scheduler;
+  private final IOzoneManagerLock lock;
   private static final String SNAPSHOT_CACHE_CLEANUP_SERVICE =
       "SnapshotCacheCleanupService";
   private final boolean compactNonSnapshotDiffTables;
@@ -86,7 +91,7 @@ private void compactSnapshotDB(OmSnapshot snapshot) throws 
IOException {
         try {
           metadataManager.getStore().compactTable(table.getName());
         } catch (IOException e) {
-          LOG.warn("Failed to compact table {} in snapshot {}: {}", 
+          LOG.warn("Failed to compact table {} in snapshot {}: {}",
               table.getName(), snapshot.getSnapshotID(), e.getMessage());
         }
       }
@@ -94,17 +99,18 @@ private void compactSnapshotDB(OmSnapshot snapshot) throws 
IOException {
   }
 
   public SnapshotCache(CacheLoader<UUID, OmSnapshot> cacheLoader, int 
cacheSizeLimit, OMMetrics omMetrics,
-                       long cleanupInterval, boolean 
compactNonSnapshotDiffTables) {
+                       long cleanupInterval, boolean 
compactNonSnapshotDiffTables, IOzoneManagerLock lock) {
     this.dbMap = new ConcurrentHashMap<>();
     this.cacheLoader = cacheLoader;
     this.cacheSizeLimit = cacheSizeLimit;
     this.omMetrics = omMetrics;
+    this.lock = lock;
     this.pendingEvictionQueue = ConcurrentHashMap.newKeySet();
     this.compactNonSnapshotDiffTables = compactNonSnapshotDiffTables;
     if (cleanupInterval > 0) {
       this.scheduler = new Scheduler(SNAPSHOT_CACHE_CLEANUP_SERVICE,
           true, 1);
-      this.scheduler.scheduleWithFixedDelay(this::cleanup, cleanupInterval,
+      this.scheduler.scheduleWithFixedDelay(() -> this.cleanup(false), 
cleanupInterval,
           cleanupInterval, TimeUnit.MILLISECONDS);
     } else {
       this.scheduler = null;
@@ -172,7 +178,8 @@ public enum Reason {
   }
 
   /**
-   * Get or load OmSnapshot. Shall be close()d after use.
+   * Get or load OmSnapshot. Shall be close()d after use. This would acquire a 
read lock on the Snapshot Database
+   * during the entire lifecycle of the returned OmSnapshot instance.
    * TODO: [SNAPSHOT] Can add reason enum to param list later.
    * @param key SnapshotId
    * @return an OmSnapshot instance, or null on error
@@ -183,6 +190,11 @@ public UncheckedAutoCloseableSupplier<OmSnapshot> get(UUID 
key) throws IOExcepti
       LOG.warn("Snapshot cache size ({}) exceeds configured soft-limit ({}).",
           size(), cacheSizeLimit);
     }
+    OMLockDetails lockDetails = lock.acquireReadLock(SNAPSHOT_DB_LOCK, 
key.toString());
+    if (!lockDetails.isLockAcquired()) {
+      throw new OMException("Unable to acquire readlock on snapshot db with 
key " + key,
+          OMException.ResultCodes.INTERNAL_ERROR);
+    }
     // Atomic operation to initialize the OmSnapshot instance (once) if the key
     // does not exist, and increment the reference count on the instance.
     ReferenceCounted<OmSnapshot> rcOmSnapshot =
@@ -214,11 +226,12 @@ public UncheckedAutoCloseableSupplier<OmSnapshot> 
get(UUID key) throws IOExcepti
     if (rcOmSnapshot == null) {
       // The only exception that would fall through the loader logic above
       // is OMException with FILE_NOT_FOUND.
+      lock.releaseReadLock(SNAPSHOT_DB_LOCK, key.toString());
       throw new OMException("SnapshotId: '" + key + "' not found, or the 
snapshot is no longer active.",
           OMException.ResultCodes.FILE_NOT_FOUND);
     }
     return new UncheckedAutoCloseableSupplier<OmSnapshot>() {
-      private AtomicReference<Boolean> closed = new AtomicReference<>(false);
+      private final AtomicReference<Boolean> closed = new 
AtomicReference<>(false);
       @Override
       public OmSnapshot get() {
         return rcOmSnapshot.get();
@@ -229,6 +242,7 @@ public void close() {
         closed.updateAndGet(alreadyClosed -> {
           if (!alreadyClosed) {
             rcOmSnapshot.decrementRefCount();
+            lock.releaseReadLock(SNAPSHOT_DB_LOCK, key.toString());
           }
           return true;
         });
@@ -249,21 +263,59 @@ public void release(UUID key) {
     val.decrementRefCount();
   }
 
+  /**
+   * Acquires a write lock on the snapshot database and returns an 
auto-closeable supplier
+   * for lock details. The lock ensures that the operations accessing the 
snapshot database
+   * are performed in a thread-safe manner. The returned supplier 
automatically releases the
+   * lock when closed, preventing potential resource contention or deadlocks.
+   */
+  public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
+    return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK),
+        () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK));
+  }
+
+  private UncheckedAutoCloseableSupplier<OMLockDetails> lock(
+      Supplier<OMLockDetails> lockFunction, Supplier<OMLockDetails> 
unlockFunction) {
+    AtomicReference<OMLockDetails> lockDetails = new 
AtomicReference<>(lockFunction.get());
+    if (lockDetails.get().isLockAcquired()) {
+      cleanup(true);
+      if (!dbMap.isEmpty()) {
+        lockDetails.set(unlockFunction.get());
+      }
+    }
+
+    return new UncheckedAutoCloseableSupplier<OMLockDetails>() {
+
+      @Override
+      public void close() {
+        lockDetails.updateAndGet((prevLock) -> {
+          if (prevLock != null && prevLock.isLockAcquired()) {
+            return unlockFunction.get();
+          }
+          return prevLock;
+        });
+      }
+
+      @Override
+      public OMLockDetails get() {
+        return lockDetails.get();
+      }
+    };
+  }
 
   /**
    * If cache size exceeds soft limit, attempt to clean up and close the
      instances that has zero reference count.
    */
-  @VisibleForTesting
-  void cleanup() {
-    if (dbMap.size() > cacheSizeLimit) {
+  private synchronized void cleanup(boolean force) {
+    if (force || dbMap.size() > cacheSizeLimit) {
       for (UUID evictionKey : pendingEvictionQueue) {
         ReferenceCounted<OmSnapshot> snapshot = dbMap.get(evictionKey);
         if (snapshot != null && snapshot.getTotalRefCount() == 0) {
           try {
             compactSnapshotDB(snapshot.get());
           } catch (IOException e) {
-            LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}", 
+            LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
                 evictionKey, e.getMessage());
           }
         }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
index 947d14e1b8b..3018f396d74 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
@@ -17,15 +17,20 @@
 
 package org.apache.hadoop.ozone.om.snapshot;
 
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -41,6 +46,10 @@
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.jupiter.api.AfterEach;
@@ -50,6 +59,8 @@
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.stubbing.Answer;
 import org.slf4j.event.Level;
 
@@ -61,6 +72,7 @@ class TestSnapshotCache {
 
   private static final int CACHE_SIZE_LIMIT = 3;
   private static CacheLoader<UUID, OmSnapshot> cacheLoader;
+  private static IOzoneManagerLock lock;
   private SnapshotCache snapshotCache;
 
   private OMMetrics omMetrics;
@@ -93,20 +105,21 @@ static void beforeAll() throws Exception {
           tables.add(table2);
           tables.add(keyTable);
           when(store.listTables()).thenReturn(tables);
-          
+
           return omSnapshot;
         }
     );
 
     // Set SnapshotCache log level. Set to DEBUG for verbose output
     GenericTestUtils.setLogLevel(SnapshotCache.class, Level.DEBUG);
+    lock = spy(new OmReadOnlyLock());
   }
 
   @BeforeEach
   void setUp() {
     // Reset cache for each test case
     omMetrics = OMMetrics.create();
-    snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, 
omMetrics, 50, true);
+    snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, 
omMetrics, 50, true, lock);
   }
 
   @AfterEach
@@ -128,6 +141,45 @@ void testGet() throws IOException {
     assertEquals(1, omMetrics.getNumSnapshotCacheSize());
   }
 
+  @Test
+  @DisplayName("Tests get() fails on read lock failure")
+  public void testGetFailsOnReadLock() throws IOException {
+    final UUID dbKey1 = UUID.randomUUID();
+    final UUID dbKey2 = UUID.randomUUID();
+    when(lock.acquireReadLock(eq(SNAPSHOT_DB_LOCK), eq(dbKey1.toString())))
+        .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED);
+    assertThrows(OMException.class, () -> snapshotCache.get(dbKey1));
+    snapshotCache.get(dbKey2);
+    assertEquals(1, snapshotCache.size());
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {0, 1, 5, 10})
+  @DisplayName("Tests get() holds a read lock")
+  public void testGetHoldsReadLock(int numberOfLocks) throws IOException {
+    clearInvocations(lock);
+    final UUID dbKey1 = UUID.randomUUID();
+    final UUID dbKey2 = UUID.randomUUID();
+    for (int i = 0; i < numberOfLocks; i++) {
+      snapshotCache.get(dbKey1);
+      snapshotCache.get(dbKey2);
+    }
+    assertEquals(numberOfLocks > 0 ? 2 : 0, snapshotCache.size());
+    verify(lock, times(numberOfLocks)).acquireReadLock(eq(SNAPSHOT_DB_LOCK), 
eq(dbKey1.toString()));
+    verify(lock, times(numberOfLocks)).acquireReadLock(eq(SNAPSHOT_DB_LOCK), 
eq(dbKey2.toString()));
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {0, 1, 5, 10})
+  @DisplayName("Tests lock() holds a write lock")
+  public void testGetHoldsWriteLock(int numberOfLocks) {
+    clearInvocations(lock);
+    for (int i = 0; i < numberOfLocks; i++) {
+      snapshotCache.lock();
+    }
+    verify(lock, 
times(numberOfLocks)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
+  }
+
   @Test
   @DisplayName("get() same entry twice yields one cache entry only")
   void testGetTwice() throws IOException {
@@ -266,7 +318,7 @@ void testEviction1() throws IOException, 
InterruptedException, TimeoutException
     assertEquals(1, snapshotCache.size());
     assertEquals(1, omMetrics.getNumSnapshotCacheSize());
     assertEntryExistence(dbKey1, false);
-    
+
     // Verify compaction was called on the tables
     org.apache.hadoop.hdds.utils.db.DBStore store1 = 
snapshot1.get().getMetadataManager().getStore();
     verify(store1, times(1)).compactTable("table1");
@@ -371,7 +423,8 @@ void testEviction3WithClose() throws IOException, 
InterruptedException, TimeoutE
   @DisplayName("Snapshot operations not blocked during compaction")
   void testSnapshotOperationsNotBlockedDuringCompaction() throws IOException, 
InterruptedException, TimeoutException {
     omMetrics = OMMetrics.create();
-    snapshotCache = new SnapshotCache(cacheLoader, 1, omMetrics, 50, true);
+    snapshotCache = new SnapshotCache(cacheLoader, 1, omMetrics, 50, true,
+        lock);
     final UUID dbKey1 = UUID.randomUUID();
     UncheckedAutoCloseableSupplier<OmSnapshot> snapshot1 = 
snapshotCache.get(dbKey1);
     assertEquals(1, snapshotCache.size());
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index 57666d3665e..2ffbaa44d82 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -135,6 +135,7 @@
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
 import 
org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap;
 import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse;
 import 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage;
@@ -370,7 +371,8 @@ public void init() throws RocksDBException, IOException, 
ExecutionException {
 
     omSnapshotManager = mock(OmSnapshotManager.class);
     when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
-    SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, 
omMetrics, 0, true);
+    SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, 
omMetrics, 0, true,
+        new OmReadOnlyLock());
 
     when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), 
anyString()))
         .thenAnswer(invocationOnMock -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to