This is an automated email from the ASF dual-hosted git repository.

sshenoy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a8206e027a6 HDDS-13768. OM should acquire snapshot cache lock before 
taking checkpoint. (#9129)
a8206e027a6 is described below

commit a8206e027a6c8d523e9a48a16267801e57098a89
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Thu Nov 6 20:09:05 2025 +0530

    HDDS-13768. OM should acquire snapshot cache lock before taking checkpoint. 
(#9129)
---
 .../TestOMDbCheckpointServletInodeBasedXfer.java   | 183 +++++++++++++++++++--
 .../om/OMDBCheckpointServletInodeBasedXfer.java    |  88 +++++-----
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   8 +
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |  29 ++++
 .../hadoop/ozone/om/snapshot/SnapshotCache.java    |   1 -
 5 files changed, 256 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
index a6ae3eaab21..b936d7ab518 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
@@ -65,8 +65,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -102,12 +104,16 @@
 import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
 import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -395,7 +401,6 @@ public void testSnapshotDBConsistency() throws Exception {
     }
     Path snapshotDbDir = Paths.get(newDbDir.toPath().toString(), 
OM_SNAPSHOT_CHECKPOINT_DIR,
         OM_DB_NAME + "-" + snapshotToModify.getSnapshotId());
-    deleteWalFiles(snapshotDbDir);
     assertTrue(Files.exists(snapshotDbDir));
     String value = getValueFromSnapshotDeleteTable(dummyKey, 
snapshotDbDir.toString());
     assertNotNull(value);
@@ -456,6 +461,172 @@ public void testWriteDBToArchive(boolean 
expectOnlySstFiles) throws Exception {
     }
   }
 
+  /**
+   * Verifies that snapshot cache lock coordinates between checkpoint and 
purge operations,
+   * preventing race conditions on follower OM where snapshot directory could 
be deleted
+   * while checkpoint is reading snapshot data.
+   *
+   * Test steps:
+   * 1. Create keys
+   * 2. Create snapshot 1
+   * 3. Create snapshot 2
+   * 4. Delete snapshot 2 (marks it as DELETED)
+   * 5. Stop SnapshotDeletingService to prevent automatic purge
+   * 6. Invoke checkpoint servlet (acquires bootstrap lock and snapshot cache 
lock)
+   * 7. Submit purge request for snapshot 2 during checkpoint processing 
(simulates Ratis transaction on follower)
+   * 8. Verify purge waits for snapshot cache lock (blocked while checkpoint 
holds it)
+   * 9. Verify checkpoint completes first and tarball includes snapshot 2 data
+   * 10. Verify purge completes after checkpoint releases snapshot cache lock
+   *
+   * @throws Exception if test setup or execution fails
+   */
+  @Test
+  public void testBootstrapOnFollowerConsistency() throws Exception {
+    String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5);
+    String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5);
+    setupCluster();
+    om.getKeyManager().getSnapshotSstFilteringService().pause();
+    om.getKeyManager().getSnapshotDeletingService().suspend();
+    // Create test data and snapshots
+    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client, 
volumeName, bucketName);
+    // Create key before first snapshot
+    TestDataUtil.createKey(bucket, "key1",
+        ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, 
ReplicationFactor.ONE),
+        "data1".getBytes(StandardCharsets.UTF_8));
+    client.getObjectStore().createSnapshot(volumeName, bucketName, 
"snapshot1");
+    client.getObjectStore().createSnapshot(volumeName, bucketName, 
"snapshot2");
+    List<OzoneSnapshot> snapshots = new ArrayList<>();
+    client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
+        .forEachRemaining(snapshots::add);
+    assertEquals(2, snapshots.size(), "Should have 2 snapshots initially");
+    OzoneSnapshot snapshot1 = snapshots.stream()
+        .filter(snap -> snap.getName().equals("snapshot1"))
+        .findFirst().get();
+    OzoneSnapshot snapshot2 = snapshots.stream()
+        .filter(snap -> snap.getName().equals("snapshot2")).findFirst().get();
+    assertEquals(2, snapshots.size(), "Should have 2 snapshots initially");
+    waitTillSnapshotInDeletedState(volumeName, bucketName, snapshot2);
+    // Setup servlet mocks for checkpoint processing
+    setupMocks();
+    
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true");
+    CountDownLatch purgeSubmitted = new CountDownLatch(1);
+    AtomicLong checkpointEndTime = new AtomicLong(0);
+    AtomicLong purgeEndTime = new AtomicLong(0);
+
+    DBStore dbStore = om.getMetadataManager().getStore();
+    DBStore spyDbStore = spy(dbStore);
+    AtomicReference<DBCheckpoint> capturedCheckpoint = new AtomicReference<>();
+    when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> {
+      // Submit purge request in background thread (simulating Ratis 
transaction on follower)
+      Thread purgeThread = new Thread(() -> {
+        try {
+          String snapshotTableKey = SnapshotInfo.getTableKey(volumeName, 
bucketName, snapshot2.getName());
+          // Construct SnapshotPurge request
+          OzoneManagerProtocolProtos.SnapshotPurgeRequest snapshotPurgeRequest 
=
+              OzoneManagerProtocolProtos.SnapshotPurgeRequest.newBuilder()
+                  .addSnapshotDBKeys(snapshotTableKey)
+                  .build();
+
+          OzoneManagerProtocolProtos.OMRequest omRequest = 
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+              .setCmdType(OzoneManagerProtocolProtos.Type.SnapshotPurge)
+              .setSnapshotPurgeRequest(snapshotPurgeRequest)
+              .setClientId(UUID.randomUUID().toString())
+              .build();
+
+          purgeSubmitted.countDown();
+          long purgeStartTime = System.currentTimeMillis();
+          // Submit via Ratis (simulating follower receiving transaction)
+          // This will trigger OMSnapshotPurgeResponse which needs 
SNAPSHOT_DB_LOCK
+          ClientId clientId = ClientId.randomId();
+          long callId = 1;
+          OzoneManagerProtocolProtos.OMResponse
+              response = om.getOmRatisServer().submitRequest(omRequest, 
clientId, callId);
+
+          if (response.getSuccess()) {
+            // Wait for purge to complete (snapshot removed from table)
+            GenericTestUtils.waitFor(() -> {
+              try {
+                boolean purged = 
om.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey) == null;
+                if (purged) {
+                  purgeEndTime.set(System.currentTimeMillis());
+                  long duration = purgeEndTime.get() - purgeStartTime;
+                  LOG.info("Purge completed in {} ms", duration);
+                }
+                return purged;
+              } catch (Exception ex) {
+                return false;
+              }
+            }, 100, 40_000);
+          }
+        } catch (Exception e) {
+          LOG.error("Purge submission failed", e);
+        }
+      });
+      purgeThread.start();
+
+      // Wait for purge request to be submitted
+      assertTrue(purgeSubmitted.await(2, TimeUnit.SECONDS), "Purge should be 
submitted");
+      // Small delay to ensure purge request reaches state machine
+      Thread.sleep(200);
+      DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
+      doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for 
verification
+      capturedCheckpoint.set(checkpoint);
+      return checkpoint;
+    });
+    // Initialize servlet
+    doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),
+        eq(false), any(), any(), eq(false));
+    omDbCheckpointServletMock.initialize(spyDbStore, 
om.getMetrics().getDBCheckpointMetrics(),
+        false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false);
+    when(responseMock.getOutputStream()).thenReturn(servletOutputStream);
+    // Process checkpoint servlet
+    omDbCheckpointServletMock.doGet(requestMock, responseMock);
+    String testDirName = folder.resolve("testDir").toString();
+    String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
+    File newDbDir = new File(newDbDirName);
+    assertTrue(newDbDir.mkdirs());
+    FileUtil.unTar(tempFile, newDbDir);
+    OmSnapshotUtils.createHardLinks(newDbDir.toPath(), true);
+    Path snapshot1DbDir = Paths.get(newDbDir.toPath().toString(),  
OM_SNAPSHOT_CHECKPOINT_DIR,
+        OM_DB_NAME + "-" + snapshot1.getSnapshotId());
+    Path snapshot2DbDir = Paths.get(newDbDir.toPath().toString(),  
OM_SNAPSHOT_CHECKPOINT_DIR,
+        OM_DB_NAME + "-" + snapshot2.getSnapshotId());
+    assertTrue(purgeEndTime.get() >= checkpointEndTime.get(),
+        "Purge should complete after checkpoint releases snapshot cache lock");
+
+    // Verify snapshot is purged
+    List<OzoneSnapshot> snapshotsAfter = new ArrayList<>();
+    client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
+        .forEachRemaining(snapshotsAfter::add);
+    assertEquals(1, snapshotsAfter.size(), "Snapshot2 should be purged");
+    boolean snapshot1IncludedInCheckpoint = Files.exists(snapshot1DbDir);
+    boolean snapshot2IncludedInCheckpoint = Files.exists(snapshot2DbDir);
+    assertTrue(snapshot1IncludedInCheckpoint && snapshot2IncludedInCheckpoint,
+        "Checkpoint should include both snapshot1 and snapshot2 data");
+    // Cleanup
+    if (capturedCheckpoint.get() != null) {
+      capturedCheckpoint.get().cleanupCheckpoint();
+    }
+  }
+
+  private void waitTillSnapshotInDeletedState(String volumeName, String 
bucketName, OzoneSnapshot snapshot)
+      throws IOException, InterruptedException, TimeoutException {
+    String snapshotTableKey = SnapshotInfo.getTableKey(volumeName, bucketName, 
snapshot.getName());
+    // delete snapshot and wait for snapshot to be purged
+    client.getObjectStore().deleteSnapshot(volumeName, bucketName, 
snapshot.getName());
+    GenericTestUtils.waitFor(() -> {
+      try {
+        SnapshotInfo snapshotInfo = 
om.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey);
+        return snapshotInfo != null &&
+            
snapshotInfo.getSnapshotStatus().name().equals(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED.name());
+      } catch (Exception ex) {
+        LOG.error("Exception while querying snapshot info for key in cache 
{}", snapshotTableKey, ex);
+        return false;
+      }
+    }, 100, 30_000);
+    om.awaitDoubleBufferFlush();
+  }
+
   @Test
   public void testBootstrapLockCoordination() throws Exception {
     // Create mocks for all background services
@@ -686,16 +857,6 @@ public void 
testCheckpointIncludesSnapshotsFromFrozenState() throws Exception {
     }
   }
 
-  private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
-    try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
-      List<Path> files = filesInTarball.filter(p -> 
p.toString().contains(".log"))
-          .collect(Collectors.toList());
-      for (Path p : files) {
-        Files.delete(p);
-      }
-    }
-  }
-
   private static Set<Path> getAllPathsInTarball(File newDbDir) throws 
IOException {
     Set<Path> allPathsInTarball = new HashSet<>();
     try (Stream<Path> filesInTarball = Files.list(newDbDir.toPath())) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index 748329be83a..3291c37a0b8 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -28,7 +28,6 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
-import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK;
 import static 
org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.includeSnapshotData;
 import static 
org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.logEstimatedTarballSize;
 import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX;
@@ -54,7 +53,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 import javax.servlet.ServletException;
@@ -72,12 +70,15 @@
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.compaction.log.CompactionLogEntry;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -256,29 +257,44 @@ public void writeDbDataToStream(HttpServletRequest 
request, OutputStream destina
       if (shouldContinue) {
         // we finished transferring files from snapshot DB's by now and
         // this is the last step where we transfer the active om.db contents
-        // get the list of sst files of the checkpoint.
-        checkpoint = createAndPrepareCheckpoint(true);
-        List<Path> sstBackupFiles = 
extractSSTFilesFromCompactionLog(checkpoint);
-        // unlimited files as we want the Active DB contents to be transferred 
in a single batch
-        maxTotalSstSize.set(Long.MAX_VALUE);
-        Path checkpointDir = checkpoint.getCheckpointLocation();
         Map<String, String> hardLinkFileMap = new HashMap<>();
-        writeDBToArchive(sstFilesToExclude, checkpointDir,
-            maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, 
false);
-        if (includeSnapshotData) {
-          // get the list of snapshots from the checkpoint
-          try (OmMetadataManagerImpl checkpointMetadataManager = 
OmMetadataManagerImpl
-              .createCheckpointMetadataManager(om.getConfiguration(), 
checkpoint)) {
-            snapshotPaths = getSnapshotDirsFromDB(omMetadataManager, 
checkpointMetadataManager,
-                snapshotLocalDataManager);
-          }
-          writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), 
maxTotalSstSize, archiveOutputStream, tmpdir,
+        SnapshotCache snapshotCache = 
om.getOmSnapshotManager().getSnapshotCache();
+        /*
+         * Acquire snapshot cache lock when includeSnapshotData is true to 
prevent race conditions
+         * between checkpoint operations and snapshot purge operations. 
Without this lock, a purge
+         * operation (e.g., from a Ratis transaction on follower OM) could 
delete snapshot directories
+         * while checkpoint is reading snapshot data, leading to 
FileNotFoundException or corrupted
+         * checkpoint data. The lock ensures checkpoint completes reading 
snapshot data before purge
+         * can delete the snapshot directory.
+         *
+         * When includeSnapshotData is false, lock is set to null and no 
locking is performed.
+         * In this case, the try-with-resources block does not call close() on 
any resource,
+         * which is intentional because snapshot consistency is not required.
+         */
+        try (UncheckedAutoCloseableSupplier<OMLockDetails> lock = 
includeSnapshotData ? snapshotCache.lock() : null) {
+          // get the list of sst files of the checkpoint.
+          checkpoint = createAndPrepareCheckpoint(true);
+          // unlimited files as we want the Active DB contents to be 
transferred in a single batch
+          maxTotalSstSize.set(Long.MAX_VALUE);
+          Path checkpointDir = checkpoint.getCheckpointLocation();
+          writeDBToArchive(sstFilesToExclude, checkpointDir, maxTotalSstSize, 
archiveOutputStream, tmpdir,
               hardLinkFileMap, false);
-          writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(),
-              maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, 
false);
-          // This is done to ensure all data to be copied correctly is flushed 
in the snapshot DB
-          transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, 
maxTotalSstSize,
-              archiveOutputStream, hardLinkFileMap);
+          if (includeSnapshotData) {
+            List<Path> sstBackupFiles = 
extractSSTFilesFromCompactionLog(checkpoint);
+            // get the list of snapshots from the checkpoint
+            try (OmMetadataManagerImpl checkpointMetadataManager = 
OmMetadataManagerImpl
+                    .createCheckpointMetadataManager(om.getConfiguration(), 
checkpoint)) {
+              snapshotPaths = getSnapshotDirsFromDB(omMetadataManager, 
checkpointMetadataManager,
+                  snapshotLocalDataManager);
+            }
+            writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), 
maxTotalSstSize, archiveOutputStream, tmpdir,
+                hardLinkFileMap, false);
+            writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(), 
maxTotalSstSize, archiveOutputStream, tmpdir,
+                hardLinkFileMap, false);
+            // This is done to ensure all data to be copied correctly is 
flushed in the snapshot DB
+            transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, 
maxTotalSstSize, archiveOutputStream,
+                hardLinkFileMap);
+          }
         }
         writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream);
         includeRatisSnapshotCompleteFlag(archiveOutputStream);
@@ -307,25 +323,15 @@ public void writeDbDataToStream(HttpServletRequest 
request, OutputStream destina
   void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, 
Set<Path> snapshotPaths,
       AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> 
archiveOutputStream,
       Map<String, String> hardLinkFileMap) throws IOException {
-    OzoneManager om = (OzoneManager) 
getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
-    OMMetadataManager omMetadataManager = om.getMetadataManager();
     for (Path snapshotDir : snapshotPaths) {
-      String snapshotId = 
OmSnapshotManager.extractSnapshotIDFromCheckpointDirName(snapshotDir.toString());
-      omMetadataManager.getLock().acquireReadLock(SNAPSHOT_DB_LOCK, 
snapshotId);
-      try {
-        // invalidate closes the snapshot DB
-        
om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId));
-        writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, 
archiveOutputStream, tmpdir,
-            hardLinkFileMap, false);
-        Path snapshotLocalPropertyYaml = Paths.get(
-            
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir));
-        if (Files.exists(snapshotLocalPropertyYaml)) {
-          File yamlFile = snapshotLocalPropertyYaml.toFile();
-          hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName());
-          linkAndIncludeFile(yamlFile, yamlFile.getName(), 
archiveOutputStream, tmpdir);
-        }
-      } finally {
-        omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK, 
snapshotId);
+      writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, 
archiveOutputStream, tmpdir, hardLinkFileMap,
+          false);
+      Path snapshotLocalPropertyYaml = Paths.get(
+          
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir));
+      if (Files.exists(snapshotLocalPropertyYaml)) {
+        File yamlFile = snapshotLocalPropertyYaml.toFile();
+        hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName());
+        linkAndIncludeFile(yamlFile, yamlFile.getName(), archiveOutputStream, 
tmpdir);
       }
     }
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 4fcb8ed22e8..06215c8df76 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -485,6 +485,14 @@ public int getSnapshotCacheSize() {
     return snapshotCache == null ? 0 : snapshotCache.size();
   }
 
+  /**
+   * Get snapshot cache instance.
+   * @return snapshotCache.
+   */
+  public SnapshotCache getSnapshotCache() {
+    return snapshotCache;
+  }
+
   /**
    * Immediately invalidate all entries and close their DB instances in cache.
    */
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 04515dcd728..1f77f6f5b49 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -83,6 +83,7 @@ public final class OzoneManagerDoubleBuffer {
   private final Daemon daemon;
   /** Is the {@link #daemon} running? */
   private final AtomicBoolean isRunning = new AtomicBoolean(false);
+  private final AtomicBoolean isPaused = new AtomicBoolean(false);
   /** Notify flush operations are completed by the {@link #daemon}. */
   private final FlushNotifier flushNotifier;
 
@@ -211,6 +212,22 @@ public OzoneManagerDoubleBuffer start() {
     return this;
   }
 
+  @VisibleForTesting
+  public void pause() {
+    synchronized (this) {
+      isPaused.set(true);
+      this.notifyAll();
+    }
+  }
+
+  @VisibleForTesting
+  public void unpause() {
+    synchronized (this) {
+      isPaused.set(false);
+      this.notifyAll();
+    }
+  }
+
   /**
    * Acquires the given number of permits from unFlushedTransactions,
    * blocking until all are available, or the thread is interrupted.
@@ -277,6 +294,18 @@ private void addToBatchTransactionInfoWithTrace(String 
parentName,
   @VisibleForTesting
   public void flushTransactions() {
     while (isRunning.get() && canFlush()) {
+      // Check if paused
+      synchronized (this) {
+        while (isPaused.get() && isRunning.get()) {
+          try {
+            this.wait();
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            return;
+          }
+        }
+      }
+
       flushCurrentBuffer();
     }
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index ce79c32fc4e..ff0b04b0541 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -151,7 +151,6 @@ public void invalidate(UUID key) {
         LOG.debug("SnapshotId: '{}' does not exist in snapshot cache.", k);
       } else {
         try {
-          v.get().getMetadataManager().getStore().flushDB();
           v.get().close();
         } catch (IOException e) {
           throw new IllegalStateException("Failed to close snapshotId: " + 
key, e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to