This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e5b4bc33d HDDS-8207. [Snapshot] Fix bugs and add tests for 
SnapshotDeletingService. (#4571)
8e5b4bc33d is described below

commit 8e5b4bc33dc6f7cea1b9cceff77a2c479a0ecab0
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Thu May 25 14:29:40 2023 -0700

    HDDS-8207. [Snapshot] Fix bugs and add tests for SnapshotDeletingService. 
(#4571)
---
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |   6 +
 .../fs/ozone/TestSnapshotDeletingService.java      | 474 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/om/TestOmSnapshot.java |   4 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  81 +++-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  19 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   8 +-
 .../apache/hadoop/ozone/om/SnapshotChainInfo.java  |  20 +
 .../hadoop/ozone/om/SnapshotChainManager.java      |   5 +
 .../key/OMDirectoriesPurgeRequestWithFSO.java      |   2 +-
 .../ozone/om/request/key/OMKeyPurgeRequest.java    |   2 +-
 .../snapshot/OMSnapshotMoveDeletedKeysRequest.java |   6 +-
 .../OMSnapshotMoveDeletedKeysResponse.java         |  59 ++-
 .../response/snapshot/OMSnapshotPurgeResponse.java |  52 ++-
 .../om/service/AbstractKeyDeletingService.java     |  60 +--
 .../ozone/om/service/DirectoryDeletingService.java |  42 +-
 .../ozone/om/service/SnapshotDeletingService.java  | 112 +++--
 .../ozone/om/snapshot/SnapshotDiffManager.java     |   4 +-
 .../hadoop/ozone/om/snapshot/SnapshotUtils.java    |  55 +--
 .../hadoop/ozone/om/TestOmSnapshotManager.java     |   4 +-
 .../key/TestOMKeyPurgeRequestAndResponse.java      |   2 +-
 .../om/service/TestSnapshotDeletingService.java    | 321 --------------
 21 files changed, 827 insertions(+), 511 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
index ea93231631..9e77870125 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
@@ -471,8 +471,14 @@ public class TestDirectoryDeletingServiceWithFSO {
     // clean up because the paths are part of a snapshot.
     // As a result on 1 deleted dir and 3 deleted files will
     // remain in dirTable and keyTable respectively.
+    long prevDDSRunCount = dirDeletingService.getRunCount().get();
+    long prevKDSRunCount = keyDeletingService.getRunCount().get();
     assertTableRowCount(deletedDirTable, 1);
     assertTableRowCount(deletedKeyTable, 3);
+    GenericTestUtils.waitFor(() -> dirDeletingService.getRunCount().get() >
+        prevDDSRunCount, 100, 10000);
+    GenericTestUtils.waitFor(() -> keyDeletingService.getRunCount().get() >
+        prevKDSRunCount, 100, 10000);
 
     assertSubPathsCount(dirDeletingService::getMovedFilesCount, 0);
     assertSubPathsCount(dirDeletingService::getMovedDirsCount, 0);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java
new file mode 100644
index 0000000000..1f2a1b657e
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSnapshotDeletingService.class);
+  private static boolean omRatisEnabled = true;
+  private static final String CONTENT = "testContent";
+
+  private MiniOzoneCluster cluster;
+  private OzoneManager om;
+  private OzoneBucket bucket1;
+  private OzoneClient client;
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME_ONE = "bucket1";
+  private static final String BUCKET_NAME_TWO = "bucket2";
+
+  @BeforeEach
+  public void setup() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
+        200, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT,
+        10000, TimeUnit.MILLISECONDS);
+    conf.setInt(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 100);
+    conf.setInt(OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK, 5);
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, omRatisEnabled);
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    // Enable filesystem snapshot feature for the test regardless of the 
default
+    conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+    client = cluster.newClient();
+    om = cluster.getOzoneManager();
+    bucket1 = TestDataUtil.createVolumeAndBucket(
+        client, VOLUME_NAME, BUCKET_NAME_ONE, BucketLayout.DEFAULT);
+  }
+
+  @AfterEach
+  public void teardown() {
+    IOUtils.closeQuietly(client);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testSnapshotSplitAndMove() throws Exception {
+    SnapshotDeletingService snapshotDeletingService = (SnapshotDeletingService)
+        om.getKeyManager().getSnapshotDeletingService();
+    Table<String, SnapshotInfo> snapshotInfoTable =
+        om.getMetadataManager().getSnapshotInfoTable();
+
+    createSnapshotDataForBucket1();
+
+    assertTableRowCount(snapshotInfoTable, 2);
+    GenericTestUtils.waitFor(() -> snapshotDeletingService
+            .getSuccessfulRunCount() >= 1, 1000, 10000);
+
+    OmSnapshot bucket1snap3 = (OmSnapshot) om.getOmSnapshotManager()
+        .checkForSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
+            getSnapshotPrefix("bucket1snap3"), true);
+
+    // Check bucket1key1 added to next non deleted snapshot db.
+    List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> omKeyInfos =
+        bucket1snap3.getMetadataManager()
+            .getDeletedTable().getRangeKVs(null, 100,
+                "/vol1/bucket1/bucket1key1");
+    Assertions.assertEquals(1, omKeyInfos.size());
+  }
+
+  @Test
+  public void testMultipleSnapshotKeyReclaim() throws Exception {
+
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        om.getMetadataManager().getDeletedTable();
+    Table<String, SnapshotInfo> snapshotInfoTable =
+        om.getMetadataManager().getSnapshotInfoTable();
+
+    createSnapshotDataForBucket1();
+
+    BucketArgs bucketArgs = new BucketArgs.Builder()
+        .setBucketLayout(BucketLayout.LEGACY)
+        .build();
+
+    OzoneBucket bucket2 = TestDataUtil.createBucket(
+        client, VOLUME_NAME, bucketArgs, BUCKET_NAME_TWO);
+    // Create key1 and key2
+    TestDataUtil.createKey(bucket2, "bucket2key1", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
+    TestDataUtil.createKey(bucket2, "bucket2key2", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
+
+    // Create Snapshot
+    client.getObjectStore().createSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+        "bucket2snap1");
+    assertTableRowCount(snapshotInfoTable, 3);
+
+    // Both key 1 and key 2 can be reclaimed when Snapshot 1 is deleted.
+    client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+        "bucket2key1", false);
+    client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+        "bucket2key2", false);
+    assertTableRowCount(deletedTable, 2);
+    SnapshotInfo delSnapInfo = snapshotInfoTable
+        .get("/vol1/bucket2/bucket2snap1");
+    client.getObjectStore().deleteSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+        "bucket2snap1");
+    assertTableRowCount(snapshotInfoTable, 2);
+    // KeyDeletingService will clean up.
+    assertTableRowCount(deletedTable, 0);
+
+    verifySnapshotChain(delSnapInfo, null);
+  }
+
+  @SuppressWarnings("checkstyle:MethodLength")
+  @Test
+  public void testSnapshotWithFSO() throws Exception {
+    Table<String, OmDirectoryInfo> dirTable =
+        om.getMetadataManager().getDirectoryTable();
+    Table<String, SnapshotInfo> snapshotInfoTable =
+        om.getMetadataManager().getSnapshotInfoTable();
+    Table<String, OmKeyInfo> keyTable =
+        om.getMetadataManager().getFileTable();
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        om.getMetadataManager().getDeletedTable();
+    Table<String, OmKeyInfo>  deletedDirTable =
+        om.getMetadataManager().getDeletedDirTable();
+    Table<String, String> renamedTable =
+        om.getMetadataManager().getSnapshotRenamedTable();
+
+    BucketArgs bucketArgs = new BucketArgs.Builder()
+        .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+        .build();
+
+    OzoneBucket bucket2 = TestDataUtil.createBucket(
+        client, VOLUME_NAME, bucketArgs, BUCKET_NAME_TWO);
+
+    // Create 10 keys
+    for (int i = 1; i <= 10; i++) {
+      TestDataUtil.createKey(bucket2, "key" + i, ReplicationFactor.THREE,
+          ReplicationType.RATIS, CONTENT);
+    }
+
+    // Create Directory and Sub
+    for (int i = 1; i <= 3; i++) {
+      String parent = "parent" + i;
+      client.getProxy().createDirectory(VOLUME_NAME,
+          BUCKET_NAME_TWO, parent);
+      for (int j = 1; j <= 3; j++) {
+        String childFile = "/childFile" + j;
+        String childDir = "/childDir" + j;
+        client.getProxy().createDirectory(VOLUME_NAME,
+            BUCKET_NAME_TWO, parent + childDir);
+        TestDataUtil.createKey(bucket2, parent + childFile,
+            ReplicationFactor.THREE, ReplicationType.RATIS, CONTENT);
+      }
+    }
+
+    // Total 12 dirs, 19 keys.
+    assertTableRowCount(dirTable, 12);
+    assertTableRowCount(keyTable, 19);
+    assertTableRowCount(deletedDirTable, 0);
+
+    // Create Snapshot1
+    client.getObjectStore().createSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+        "snap1");
+    assertTableRowCount(snapshotInfoTable, 1);
+
+    // Delete 5 Keys
+    for (int i = 1; i <= 5; i++) {
+      client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+          "key" + i, false);
+    }
+    // Rename Keys 3 keys
+    for (int i = 6; i <= 8; i++) {
+      client.getProxy().renameKey(VOLUME_NAME, BUCKET_NAME_TWO, "key" + i,
+          "renamedKey" + i);
+    }
+
+    // Rename 1 Dir
+    for (int i = 1; i <= 1; i++) {
+      client.getProxy().renameKey(VOLUME_NAME, BUCKET_NAME_TWO, "/parent" + i,
+          "/renamedParent" + i);
+    }
+
+    // Delete 2 Dirs
+    for (int i = 2; i <= 3; i++) {
+      client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO, "/parent" + i,
+          true);
+    }
+
+    assertTableRowCount(renamedTable, 4);
+    // Delete Renamed Keys
+    for (int i = 6; i <= 8; i++) {
+      client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+          "renamedKey" + i, false);
+    }
+
+    // Delete Renamed Dir
+    for (int i = 1; i <= 1; i++) {
+      client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+          "/renamedParent" + i, true);
+    }
+
+    assertTableRowCount(deletedTable, 8);
+    assertTableRowCount(deletedDirTable, 3);
+    assertTableRowCount(dirTable, 9);
+    assertTableRowCount(renamedTable, 4);
+
+    // Create Snapshot2
+    client.getObjectStore().createSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+        "snap2");
+
+    assertTableRowCount(snapshotInfoTable, 2);
+    // Once snapshot is taken renamedTable, deletedTable, deletedDirTable
+    // should be cleaned
+    assertTableRowCount(renamedTable, 0);
+    assertTableRowCount(deletedTable, 0);
+    assertTableRowCount(deletedDirTable, 0);
+
+    // Delete 2 more keys
+    for (int i = 9; i <= 10; i++) {
+      client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+          "key" + i, false);
+    }
+
+    assertTableRowCount(deletedTable, 2);
+
+    // Create Snapshot3
+    client.getObjectStore().createSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+        "snap3");
+    assertTableRowCount(snapshotInfoTable, 3);
+
+    assertTableRowCount(renamedTable, 0);
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(deletedTable, 0);
+    assertTableRowCount(keyTable, 9);
+    SnapshotInfo deletedSnap = om.getMetadataManager()
+        .getSnapshotInfoTable().get("/vol1/bucket2/snap2");
+
+    client.getObjectStore().deleteSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+        "snap2");
+    assertTableRowCount(snapshotInfoTable, 2);
+
+    // Once all the tables are moved, the snapshot is deleted
+    assertTableRowCount(om.getMetadataManager().getSnapshotInfoTable(), 2);
+
+    verifySnapshotChain(deletedSnap, "/vol1/bucket2/snap3");
+    OmSnapshot snap3 = (OmSnapshot) om.getOmSnapshotManager()
+        .checkForSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+            getSnapshotPrefix("snap3"), true);
+
+    Table<String, OmKeyInfo> snapDeletedDirTable =
+        snap3.getMetadataManager().getDeletedDirTable();
+    Table<String, String> snapRenamedTable =
+        snap3.getMetadataManager().getSnapshotRenamedTable();
+    Table<String, RepeatedOmKeyInfo> snapDeletedTable =
+        snap3.getMetadataManager().getDeletedTable();
+
+    assertTableRowCount(snapRenamedTable, 4);
+    assertTableRowCount(snapDeletedDirTable, 3);
+    // All the keys deleted before snapshot2 is moved to snap3
+    assertTableRowCount(snapDeletedTable, 10);
+
+    // Before deleting the last snapshot
+    assertTableRowCount(renamedTable, 0);
+    assertTableRowCount(deletedDirTable, 0);
+    assertTableRowCount(deletedTable, 0);
+    // Delete Snapshot3 and check entries moved to active DB
+    client.getObjectStore().deleteSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+        "snap3");
+
+    // Check entries moved to active DB
+    assertTableRowCount(snapshotInfoTable, 1);
+    assertTableRowCount(renamedTable, 4);
+    assertTableRowCount(deletedDirTable, 3);
+    assertTableRowCount(deletedTable, 10);
+  }
+
+  /*
+      Flow
+      ----
+      create key1
+      create snapshot1
+      create key2
+      delete key1
+      delete key2
+      create snapshot2
+      create key3
+      create key4
+      delete key4
+      create snapshot3
+      delete snapshot2
+   */
+  private void createSnapshotDataForBucket1() throws Exception {
+    Table<String, SnapshotInfo> snapshotInfoTable =
+        om.getMetadataManager().getSnapshotInfoTable();
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        om.getMetadataManager().getDeletedTable();
+    Table<String, OmKeyInfo> keyTable =
+        om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT);
+    OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
+        om.getMetadataManager();
+
+    TestDataUtil.createKey(bucket1, "bucket1key1", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
+    assertTableRowCount(keyTable, 1);
+
+    // Create Snapshot 1.
+    client.getProxy().createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1snap1");
+    assertTableRowCount(snapshotInfoTable, 1);
+    TestDataUtil.createKey(bucket1, "bucket1key2", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
+
+    // Key 1 cannot be reclaimed as it is still referenced by Snapshot 1.
+    client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1key1", false);
+    // Key 2 is deleted here, which will be reclaimed here as
+    // it is not being referenced by previous snapshot.
+    client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1key2", false);
+    assertTableRowCount(deletedTable, 1);
+
+    // Create Snapshot 2.
+    client.getProxy().createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1snap2");
+    assertTableRowCount(snapshotInfoTable, 2);
+    // Key 2 is removed from the active Db's
+    // deletedTable when Snapshot 2 is taken.
+    assertTableRowCount(deletedTable, 0);
+
+    TestDataUtil.createKey(bucket1, "bucket1key3", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
+    TestDataUtil.createKey(bucket1, "bucket1key4", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
+    client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1key4", false);
+    assertTableRowCount(keyTable, 1);
+    assertTableRowCount(deletedTable, 0);
+
+    // Create Snapshot 3.
+    client.getProxy().createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1snap3");
+    assertTableRowCount(snapshotInfoTable, 3);
+
+    SnapshotInfo snapshotInfo = metadataManager.getSnapshotInfoTable()
+        .get("/vol1/bucket1/bucket1snap2");
+
+    // Delete Snapshot 2.
+    client.getProxy().deleteSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1snap2");
+    assertTableRowCount(snapshotInfoTable, 2);
+    verifySnapshotChain(snapshotInfo, "/vol1/bucket1/bucket1snap3");
+  }
+
+  private void verifySnapshotChain(SnapshotInfo deletedSnapshot,
+                                   String nextSnapshot)
+      throws Exception {
+    OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
+        om.getMetadataManager();
+    String pathPreviousSnapshotID = 
deletedSnapshot.getPathPreviousSnapshotID();
+    String globalPreviousSnapshotID = deletedSnapshot
+        .getGlobalPreviousSnapshotID();
+    GenericTestUtils.waitFor(() -> {
+      try {
+        SnapshotInfo snapshotInfo = metadataManager.getSnapshotInfoTable()
+            .get(deletedSnapshot.getTableKey());
+        return snapshotInfo == null;
+      } catch (IOException e) {
+        LOG.error("Error getting snapInfo.");
+      }
+      return false;
+    }, 100, 10000);
+
+    if (nextSnapshot != null) {
+      SnapshotInfo nextSnapshotInfo = metadataManager
+          .getSnapshotInfoTable().get(nextSnapshot);
+      GenericTestUtils.waitFor(() -> Objects.equals(
+          nextSnapshotInfo.getPathPreviousSnapshotID(), pathPreviousSnapshotID)
+          && Objects.equals(nextSnapshotInfo.getGlobalPreviousSnapshotID(),
+          globalPreviousSnapshotID), 100, 10000);
+    }
+  }
+
+  private void assertTableRowCount(Table<String, ?> table, int count)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> assertTableRowCount(count, table), 1000,
+        120000); // 2 minutes
+  }
+
+  private boolean assertTableRowCount(int expectedCount,
+                                      Table<String, ?> table) {
+    long count = 0L;
+    try {
+      count = cluster.getOzoneManager().getMetadataManager()
+          .countRowsInTable(table);
+      LOG.info("{} actual row count={}, expectedCount={}", table.getName(),
+          count, expectedCount);
+    } catch (IOException ex) {
+      fail("testDoubleBuffer failed with: " + ex);
+    }
+    return count == expectedCount;
+  }
+}
+
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
index a61b00be51..256e903f14 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -878,7 +878,7 @@ public class TestOmSnapshot {
     String snapPrefix = createSnapshot(volumeName, bucketName);
     RDBStore snapshotDBStore = (RDBStore)
         ((OmSnapshot)cluster.getOzoneManager().getOmSnapshotManager()
-            .checkForSnapshot(volumeName, bucketName, snapPrefix))
+            .checkForSnapshot(volumeName, bucketName, snapPrefix, false))
             .getMetadataManager().getStore();
 
     for (String table : snapshotDBStore.getTableNames().values()) {
@@ -1002,7 +1002,7 @@ public class TestOmSnapshot {
 
     OmSnapshot omSnapshot = (OmSnapshot) cluster.getOzoneManager()
         .getOmSnapshotManager()
-        .checkForSnapshot(volumeName, bucketName, snapshotName);
+        .checkForSnapshot(volumeName, bucketName, snapshotName, false);
 
     RDBStore snapshotDbStore =
         (RDBStore) omSnapshot.getMetadataManager().getStore();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 8f6a2525e6..81b6292bd6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -98,6 +98,7 @@ import static 
org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotDirExist;
 
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.util.ExitUtils;
@@ -372,6 +373,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
         File checkpoint =
             Paths.get(metaDir.toPath().toString(), dbName).toFile();
         RDBCheckpointUtils.waitForCheckpointDirectoryExist(checkpoint);
+        // Check if the snapshot directory exists.
+        checkSnapshotDirExist(checkpoint);
       }
       setStore(loadDB(conf, metaDir, dbName, false,
           java.util.Optional.of(Boolean.TRUE), false, false));
@@ -1477,7 +1480,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
           // Get volume name and bucket name
           String[] keySplit = kv.getKey().split(OM_KEY_PREFIX);
           // Get the latest snapshot in snapshot path.
-          OmSnapshot latestSnapshot = getLatestSnapshot(keySplit[1],
+          OmSnapshot latestSnapshot = getLatestActiveSnapshot(keySplit[1],
               keySplit[2], omSnapshotManager);
           String bucketKey = getBucketKey(keySplit[1], keySplit[2]);
           OmBucketInfo bucketInfo = getBucketTable().get(bucketKey);
@@ -1500,26 +1503,48 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
             //  4. Further optimization: Skip all snapshotted keys altogether
             //  e.g. by prefixing all unreclaimable keys, then calling seek
 
+            // If the last snapshot is deleted and the keys renamed in between
+            // the snapshots will be cleaned up by KDS. So we need to check
+            // in the renamedTable as well.
+            String dbRenameKey = getRenameKey(info.getVolumeName(),
+                info.getBucketName(), info.getObjectID());
+
             if (latestSnapshot != null) {
               Table<String, OmKeyInfo> prevKeyTable =
                   latestSnapshot.getMetadataManager().getKeyTable(
                       bucketInfo.getBucketLayout());
-              String prevDbKey;
-              if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+
+              Table<String, RepeatedOmKeyInfo> prevDeletedTable =
+                  latestSnapshot.getMetadataManager().getDeletedTable();
+              String prevKeyTableDBKey = getSnapshotRenamedTable()
+                  .get(dbRenameKey);
+              String prevDelTableDBKey = getOzoneKey(info.getVolumeName(),
+                  info.getBucketName(), info.getKeyName());
+              // format: /volName/bucketName/keyName/objId
+              prevDelTableDBKey = getOzoneDeletePathKey(info.getObjectID(),
+                  prevDelTableDBKey);
+
+              if (prevKeyTableDBKey == null &&
+                  bucketInfo.getBucketLayout().isFileSystemOptimized()) {
                 long volumeId = getVolumeId(info.getVolumeName());
-                prevDbKey = getOzonePathKey(volumeId,
+                prevKeyTableDBKey = getOzonePathKey(volumeId,
                     bucketInfo.getObjectID(),
                     info.getParentObjectID(),
                     info.getKeyName());
-              } else {
-                prevDbKey = getOzoneKey(info.getVolumeName(),
+              } else if (prevKeyTableDBKey == null) {
+                prevKeyTableDBKey = getOzoneKey(info.getVolumeName(),
                     info.getBucketName(),
                     info.getKeyName());
               }
 
-              OmKeyInfo omKeyInfo = prevKeyTable.get(prevDbKey);
-              if (omKeyInfo != null &&
-                  info.getObjectID() == omKeyInfo.getObjectID()) {
+              OmKeyInfo omKeyInfo = prevKeyTable.get(prevKeyTableDBKey);
+              // When key is deleted it is no longer in keyTable, we also
+              // have to check deletedTable of previous snapshot
+              RepeatedOmKeyInfo delOmKeyInfo =
+                  prevDeletedTable.get(prevDelTableDBKey);
+              if ((omKeyInfo != null &&
+                  info.getObjectID() == omKeyInfo.getObjectID()) ||
+                  delOmKeyInfo != null) {
                 // TODO: [SNAPSHOT] For now, we are not cleaning up a key in
                 //  active DB's deletedTable if any one of the keys in
                 //  RepeatedOmKeyInfo exists in last snapshot's key/fileTable.
@@ -1555,22 +1580,40 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
   /**
    * Get the latest OmSnapshot for a snapshot path.
    */
-  public OmSnapshot getLatestSnapshot(String volumeName, String bucketName,
-                                      OmSnapshotManager snapshotManager)
+  public OmSnapshot getLatestActiveSnapshot(String volumeName,
+                                            String bucketName,
+                                            OmSnapshotManager snapshotManager)
       throws IOException {
 
-    String latestPathSnapshot =
-        snapshotChainManager.getLatestPathSnapshot(volumeName
-            + OM_KEY_PREFIX + bucketName);
-    String snapTableKey = latestPathSnapshot != null ?
-        snapshotChainManager.getTableKey(latestPathSnapshot) : null;
-    SnapshotInfo snapInfo = snapTableKey != null ?
-        getSnapshotInfoTable().get(snapTableKey) : null;
+    String snapshotPath = volumeName + OM_KEY_PREFIX + bucketName;
+    String latestPathSnapshot = snapshotChainManager
+        .getLatestPathSnapshot(snapshotPath);
+
+    SnapshotInfo snapInfo = null;
+    while (latestPathSnapshot != null) {
+      String snapTableKey = snapshotChainManager
+          .getTableKey(latestPathSnapshot);
+      snapInfo = getSnapshotInfoTable().get(snapTableKey);
+
+      if (snapInfo != null && snapInfo.getSnapshotStatus() ==
+          SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
+        break;
+      }
+
+      // Update latestPathSnapshot if current snapshot is deleted.
+      if (snapshotChainManager.hasPreviousPathSnapshot(
+          snapshotPath, latestPathSnapshot)) {
+        latestPathSnapshot = snapshotChainManager
+            .previousPathSnapshot(snapshotPath, latestPathSnapshot);
+      } else {
+        latestPathSnapshot = null;
+      }
+    }
 
     OmSnapshot omSnapshot = null;
     if (snapInfo != null) {
       omSnapshot = (OmSnapshot) snapshotManager.checkForSnapshot(volumeName,
-              bucketName, getSnapshotPrefix(snapInfo.getName()));
+              bucketName, getSnapshotPrefix(snapInfo.getName()), true);
     }
     return omSnapshot;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index d6f4dd79cf..81aa443587 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -297,9 +297,9 @@ public final class OmSnapshotManager implements 
AutoCloseable {
         // see if the snapshot exists
         SnapshotInfo snapshotInfo = getSnapshotInfo(snapshotTableKey);
 
-        // Block snapshot from loading when it is no longer active
-        // e.g. DELETED, unless this is called from SnapshotDeletingService.
-        checkSnapshotActive(snapshotInfo);
+        // Block snapshot from loading when it is no longer active e.g. 
DELETED,
+        // unless this is called from SnapshotDeletingService.
+        checkSnapshotActive(snapshotInfo, true);
 
         CacheValue<SnapshotInfo> cacheValue = ozoneManager.getMetadataManager()
             .getSnapshotInfoTable()
@@ -589,7 +589,9 @@ public final class OmSnapshotManager implements 
AutoCloseable {
 
   // Get OmSnapshot if the keyname has ".snapshot" key indicator
   public IOmMetadataReader checkForSnapshot(String volumeName,
-                                            String bucketName, String keyname)
+                                            String bucketName,
+                                            String keyname,
+                                            boolean skipActiveCheck)
       throws IOException {
     if (keyname == null || !ozoneManager.isFilesystemSnapshotEnabled()) {
       return ozoneManager.getOmMetadataReader();
@@ -607,7 +609,9 @@ public final class OmSnapshotManager implements 
AutoCloseable {
           bucketName, snapshotName);
 
       // Block FS API reads when snapshot is not active.
-      checkSnapshotActive(ozoneManager, snapshotTableKey);
+      if (!skipActiveCheck) {
+        checkSnapshotActive(ozoneManager, snapshotTableKey);
+      }
 
       // Warn if actual cache size exceeds the soft limit already.
       if (snapshotCache.size() > softCacheSize) {
@@ -689,9 +693,8 @@ public final class OmSnapshotManager implements 
AutoCloseable {
         volumeName, bucketName, toSnapshotName);
 
     // Block SnapDiff if either of the snapshots is not active.
-    checkSnapshotActive(fromSnapInfo);
-    checkSnapshotActive(toSnapInfo);
-
+    checkSnapshotActive(fromSnapInfo, false);
+    checkSnapshotActive(toSnapInfo, false);
     // Check snapshot creation time
     if (fromSnapInfo.getCreationTime() > toSnapInfo.getCreationTime()) {
       throw new IOException("fromSnapshot:" + fromSnapInfo.getName() +
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 0bdcc10cf9..d45b5d6e04 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4576,18 +4576,20 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   private IOmMetadataReader getReader(OmKeyArgs keyArgs) throws IOException {
     return omSnapshotManager.checkForSnapshot(
-        keyArgs.getVolumeName(), keyArgs.getBucketName(), 
keyArgs.getKeyName());
+        keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName(),
+        false);
   }
 
   private IOmMetadataReader getReader(String volumeName, String bucketName,
       String key) throws IOException {
-    return omSnapshotManager.checkForSnapshot(volumeName, bucketName, key);
+    return omSnapshotManager.checkForSnapshot(volumeName, bucketName, key,
+        false);
   }
 
   private IOmMetadataReader getReader(OzoneObj ozoneObj) throws IOException {
     return omSnapshotManager.checkForSnapshot(
         ozoneObj.getVolumeName(), ozoneObj.getBucketName(),
-        ozoneObj.getKeyName());
+        ozoneObj.getKeyName(), false);
   }
 
   public SnapshotDiffResponse snapshotDiff(String volume,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainInfo.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainInfo.java
index 0d6db3b945..7d4d5165db 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainInfo.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainInfo.java
@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.ozone.om;
 
+import java.util.Objects;
+
 /**
  * SnapshotChain supporting SnapshotInfo class.
  *
@@ -54,4 +56,22 @@ public class SnapshotChainInfo {
     return previousSnapshotID;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SnapshotChainInfo that = (SnapshotChainInfo) o;
+    return Objects.equals(snapshotID, that.snapshotID) &&
+        Objects.equals(previousSnapshotID, that.previousSnapshotID) &&
+        Objects.equals(nextSnapshotID, that.nextSnapshotID);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(snapshotID, previousSnapshotID, nextSnapshotID);
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
index c3ec6d1d61..dad450328f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
@@ -503,4 +503,9 @@ public class SnapshotChainManager {
     loadFromSnapshotInfoTable(metadataManager);
   }
 
+  @VisibleForTesting
+  public LinkedHashMap<String, SnapshotChainInfo> getSnapshotChainPath(
+      String path) {
+    return snapshotChainPath.get(path);
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index 6926de1fed..a63ae3265c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -79,7 +79,7 @@ public class OMDirectoriesPurgeRequestWithFSO extends 
OMKeyRequest {
         omFromSnapshot = (OmSnapshot) omSnapshotManager
             .checkForSnapshot(snapshotInfo.getVolumeName(),
                 snapshotInfo.getBucketName(),
-                getSnapshotPrefix(snapshotInfo.getName()));
+                getSnapshotPrefix(snapshotInfo.getName()), true);
       }
 
       for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index 1f49a731d5..286f89d69e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -84,7 +84,7 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
         omFromSnapshot = (OmSnapshot) omSnapshotManager
             .checkForSnapshot(snapshotInfo.getVolumeName(),
                 snapshotInfo.getBucketName(),
-                getSnapshotPrefix(snapshotInfo.getName()));
+                getSnapshotPrefix(snapshotInfo.getName()), true);
       }
 
       omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
index 10a5b19e1c..d930e82149 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
@@ -80,7 +80,7 @@ public class OMSnapshotMoveDeletedKeysRequest extends 
OMClientRequest {
       OmSnapshot omFromSnapshot = (OmSnapshot) omSnapshotManager
           .checkForSnapshot(fromSnapshot.getVolumeName(),
               fromSnapshot.getBucketName(),
-              getSnapshotPrefix(fromSnapshot.getName()));
+              getSnapshotPrefix(fromSnapshot.getName()), true);
 
       nextSnapshot = getNextActiveSnapshot(fromSnapshot,
           snapshotChainManager, omSnapshotManager);
@@ -101,7 +101,7 @@ public class OMSnapshotMoveDeletedKeysRequest extends 
OMClientRequest {
         omNextSnapshot = (OmSnapshot) omSnapshotManager
             .checkForSnapshot(nextSnapshot.getVolumeName(),
                 nextSnapshot.getBucketName(),
-                getSnapshotPrefix(nextSnapshot.getName()));
+                getSnapshotPrefix(nextSnapshot.getName()), true);
       }
 
       omClientResponse = new OMSnapshotMoveDeletedKeysResponse(
@@ -140,6 +140,8 @@ public class OMSnapshotMoveDeletedKeysRequest extends 
OMClientRequest {
           SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE)) {
         return nextSnapshotInfo;
       }
+
+      snapInfo = nextSnapshotInfo;
     }
     return null;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
index fc2170e6a1..ab3350fd47 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
@@ -83,8 +83,7 @@ public class OMSnapshotMoveDeletedKeysResponse extends 
OMClientResponse {
           (RDBStore) nextSnapshot.getMetadataManager().getStore();
       // Init Batch Operation for snapshot db.
       try (BatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) 
{
-        processKeys(writeBatch, nextSnapshot.getMetadataManager(),
-            nextDBKeysList, true);
+        processKeys(writeBatch, nextSnapshot.getMetadataManager());
         processDirs(writeBatch, nextSnapshot.getMetadataManager());
         nextSnapshotStore.commitBatchOperation(writeBatch);
         nextSnapshotStore.getDb().flushWal(true);
@@ -92,7 +91,7 @@ public class OMSnapshotMoveDeletedKeysResponse extends 
OMClientResponse {
       }
     } else {
       // Handle the case where there is no next Snapshot.
-      processKeys(batchOperation, omMetadataManager, nextDBKeysList, true);
+      processKeys(batchOperation, omMetadataManager);
       processDirs(batchOperation, omMetadataManager);
     }
 
@@ -101,43 +100,69 @@ public class OMSnapshotMoveDeletedKeysResponse extends 
OMClientResponse {
         (RDBStore) fromSnapshot.getMetadataManager().getStore();
     try (BatchOperation fromSnapshotBatchOp =
              fromSnapshotStore.initBatchOperation()) {
-      processKeys(fromSnapshotBatchOp, fromSnapshot.getMetadataManager(),
-          reclaimKeysList, false);
+      processReclaimKeys(fromSnapshotBatchOp,
+          fromSnapshot.getMetadataManager());
+      deleteDirsFromSnapshot(fromSnapshotBatchOp);
       fromSnapshotStore.commitBatchOperation(fromSnapshotBatchOp);
       fromSnapshotStore.getDb().flushWal(true);
       fromSnapshotStore.getDb().flush();
     }
   }
 
+  private void deleteDirsFromSnapshot(BatchOperation batchOp)
+      throws IOException {
+    for (String movedDirsKey : movedDirs) {
+      // Delete dirs from current snapshot that are moved to next snapshot.
+      fromSnapshot.getMetadataManager().getDeletedDirTable()
+          .deleteWithBatch(batchOp, movedDirsKey);
+    }
+  }
+
+  private void processReclaimKeys(BatchOperation batchOp,
+                                  OMMetadataManager metadataManager)
+      throws IOException {
+    for (SnapshotMoveKeyInfos dBKey : reclaimKeysList) {
+      RepeatedOmKeyInfo omKeyInfos =
+          createRepeatedOmKeyInfo(dBKey.getKeyInfosList());
+      // omKeyInfos can be null, because everything from RepeatedOmKeyInfo
+      // is moved to next snapshot which means this key can be deleted in
+      // the current snapshot processed by SDS. The reclaim key here indicates
+      // the key can be removed from the deleted current snapshot
+      if (omKeyInfos == null) {
+        metadataManager.getDeletedTable().deleteWithBatch(batchOp,
+            dBKey.getKey());
+        continue;
+      }
+      metadataManager.getDeletedTable().putWithBatch(batchOp,
+          dBKey.getKey(), omKeyInfos);
+    }
+  }
+
   private void processDirs(BatchOperation batchOp,
                            OMMetadataManager omMetadataManager)
       throws IOException {
     for (String movedDirsKey : movedDirs) {
       OmKeyInfo keyInfo = 
fromSnapshot.getMetadataManager().getDeletedDirTable()
           .get(movedDirsKey);
+      if (keyInfo == null) {
+        continue;
+      }
       // Move deleted dirs to next snapshot or active DB
       omMetadataManager.getDeletedDirTable().putWithBatch(
           batchOp, movedDirsKey, keyInfo);
-      // Delete dirs from current snapshot that are moved to next snapshot.
-      fromSnapshot.getMetadataManager().getDeletedDirTable()
-          .deleteWithBatch(batchOp, movedDirsKey);
     }
   }
 
   private void processKeys(BatchOperation batchOp,
-      OMMetadataManager metadataManager,
-      List<SnapshotMoveKeyInfos> keyList,
-      boolean isNextDB) throws IOException {
+      OMMetadataManager metadataManager) throws IOException {
 
     // Move renamed keys to only the next snapshot or active DB.
-    if (isNextDB) {
-      for (HddsProtos.KeyValue renamedKey: renamedKeysList) {
-        metadataManager.getSnapshotRenamedTable()
-            .putWithBatch(batchOp, renamedKey.getKey(), renamedKey.getValue());
-      }
+    for (HddsProtos.KeyValue renamedKey: renamedKeysList) {
+      metadataManager.getSnapshotRenamedTable()
+          .putWithBatch(batchOp, renamedKey.getKey(), renamedKey.getValue());
     }
 
-    for (SnapshotMoveKeyInfos dBKey : keyList) {
+    for (SnapshotMoveKeyInfos dBKey : nextDBKeysList) {
       RepeatedOmKeyInfo omKeyInfos =
           createRepeatedOmKeyInfo(dBKey.getKeyInfosList());
       if (omKeyInfos == null) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
index 9d625ea1dc..941813afcb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
 
@@ -63,6 +64,13 @@ public class OMSnapshotPurgeResponse extends 
OMClientResponse {
     for (String dbKey: snapshotDbKeys) {
       SnapshotInfo snapshotInfo = omMetadataManager
           .getSnapshotInfoTable().get(dbKey);
+      // Even though snapshot existed when SnapshotDeletingService
+      // was running. It might be deleted in the previous run and
+      // the DB might not have been updated yet. So snapshotInfo
+      // can be null.
+      if (snapshotInfo == null) {
+        continue;
+      }
       cleanupSnapshotChain(metadataManager, snapshotInfo, batchOperation);
       // Delete Snapshot checkpoint directory.
       deleteCheckpointDirectory(omMetadataManager, snapshotInfo);
@@ -82,18 +90,35 @@ public class OMSnapshotPurgeResponse extends 
OMClientResponse {
       SnapshotInfo snapInfo, BatchOperation batchOperation) throws IOException 
{
     SnapshotChainManager snapshotChainManager = metadataManager
         .getSnapshotChainManager();
+    SnapshotInfo nextPathSnapInfo = null;
+    SnapshotInfo nextGlobalSnapInfo = null;
+
+    // If the snapshot is deleted in the previous run, then the in-memory
+    // SnapshotChainManager might throw NoSuchElementException as the snapshot
+    // is removed in-memory but OMDoubleBuffer has not flushed yet.
+    boolean hasNextPathSnapshot = false;
+    boolean hasNextGlobalSnapshot = false;
+    try {
+      hasNextPathSnapshot = snapshotChainManager.hasNextPathSnapshot(
+          snapInfo.getSnapshotPath(), snapInfo.getSnapshotID());
+      hasNextGlobalSnapshot = snapshotChainManager.hasNextGlobalSnapshot(
+          snapInfo.getSnapshotID());
+    } catch (NoSuchElementException ex) {
+      LOG.warn("The Snapshot {} could have been deleted in the previous run.",
+          snapInfo.getSnapshotID(), ex);
+      return;
+    }
 
     // Updates next path snapshot's previous snapshot ID
-    if (snapshotChainManager.hasNextPathSnapshot(
-        snapInfo.getSnapshotPath(), snapInfo.getSnapshotID())) {
+    if (hasNextPathSnapshot) {
       String nextPathSnapshotId =
           snapshotChainManager.nextPathSnapshot(
               snapInfo.getSnapshotPath(), snapInfo.getSnapshotID());
 
       String snapshotTableKey = snapshotChainManager
           .getTableKey(nextPathSnapshotId);
-      SnapshotInfo nextPathSnapInfo =
-          metadataManager.getSnapshotInfoTable().get(snapshotTableKey);
+      nextPathSnapInfo = metadataManager
+          .getSnapshotInfoTable().get(snapshotTableKey);
       if (nextPathSnapInfo != null) {
         nextPathSnapInfo.setPathPreviousSnapshotID(
             snapInfo.getPathPreviousSnapshotID());
@@ -103,16 +128,25 @@ public class OMSnapshotPurgeResponse extends 
OMClientResponse {
     }
 
     // Updates next global snapshot's previous snapshot ID
-    if (snapshotChainManager.hasNextGlobalSnapshot(
-        snapInfo.getSnapshotID())) {
+    if (hasNextGlobalSnapshot) {
       String nextGlobalSnapshotId =
           snapshotChainManager.nextGlobalSnapshot(snapInfo.getSnapshotID());
 
       String snapshotTableKey = snapshotChainManager
           .getTableKey(nextGlobalSnapshotId);
-      SnapshotInfo nextGlobalSnapInfo =
-          metadataManager.getSnapshotInfoTable().get(snapshotTableKey);
-      if (nextGlobalSnapInfo != null) {
+      nextGlobalSnapInfo = metadataManager.getSnapshotInfoTable()
+          .get(snapshotTableKey);
+      // If both next global and path snapshot are same, it may overwrite
+      // nextPathSnapInfo.setPathPreviousSnapshotID(), adding this check
+      // will prevent it.
+      if (nextGlobalSnapInfo != null && nextPathSnapInfo != null &&
+          nextGlobalSnapInfo.getSnapshotID().equals(
+              nextPathSnapInfo.getSnapshotID())) {
+        nextPathSnapInfo.setGlobalPreviousSnapshotID(
+            snapInfo.getPathPreviousSnapshotID());
+        metadataManager.getSnapshotInfoTable().putWithBatch(batchOperation,
+            nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
+      } else if (nextGlobalSnapInfo != null) {
         nextGlobalSnapInfo.setGlobalPreviousSnapshotID(
             snapInfo.getPathPreviousSnapshotID());
         metadataManager.getSnapshotInfoTable().putWithBatch(batchOperation,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 27d2ebf752..152159b0e2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.om.service;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -35,15 +34,11 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
@@ -389,7 +384,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     }
 
     // TODO: need to handle delete with non-ratis
-    if (isRatisEnabled()) {
+    if (isRatisEnabled() && !purgePathRequestList.isEmpty()) {
       submitPurgePaths(purgePathRequestList, snapTableKey);
     }
 
@@ -464,59 +459,6 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     return movedFilesCount.get();
   }
 
-  protected void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
-      List<SnapshotMoveKeyInfos> toReclaimList,
-      List<SnapshotMoveKeyInfos> toNextDBList,
-      List<HddsProtos.KeyValue> renamedList,
-      List<String> dirsToMove) throws InterruptedException {
-
-    SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder =
-        SnapshotMoveDeletedKeysRequest.newBuilder()
-            .setFromSnapshot(snapInfo.getProtobuf());
-
-    SnapshotMoveDeletedKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
-        .addAllReclaimKeys(toReclaimList)
-        .addAllNextDBKeys(toNextDBList)
-        .addAllRenamedKeys(renamedList)
-        .addAllDeletedDirsToMove(dirsToMove)
-        .build();
-
-    OMRequest omRequest = OMRequest.newBuilder()
-        .setCmdType(Type.SnapshotMoveDeletedKeys)
-        .setSnapshotMoveDeletedKeysRequest(moveDeletedKeys)
-        .setClientId(clientId.toString())
-        .build();
-
-    try (BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock()) {
-      submitRequest(omRequest);
-    }
-  }
-
-  protected void submitRequest(OMRequest omRequest) {
-    try {
-      if (isRatisEnabled()) {
-        OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
-
-        RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-            .setClientId(clientId)
-            .setServerId(server.getRaftPeerId())
-            .setGroupId(server.getRaftGroupId())
-            .setCallId(getRunCount().get())
-            .setMessage(Message.valueOf(
-                OMRatisHelper.convertRequestToByteString(omRequest)))
-            .setType(RaftClientRequest.writeRequestType())
-            .build();
-
-        server.submitRequest(omRequest, raftClientRequest);
-      } else {
-        ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
-      }
-    } catch (ServiceException e) {
-      LOG.error("Snapshot Deleting request failed. " +
-          "Will retry at next run.", e);
-    }
-  }
-
   public BootstrapStateHandler.Lock getBootstrapStateLock() {
     return lock;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 52770f7e77..5a39afe502 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.om.service;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
@@ -40,6 +41,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -74,6 +76,7 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
 
   // Number of items(dirs/files) to be batched in an iteration.
   private final long pathLimitPerTask;
+  private final AtomicBoolean suspended;
 
   public DirectoryDeletingService(long interval, TimeUnit unit,
       long serviceTimeout, OzoneManager ozoneManager,
@@ -83,6 +86,7 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
     this.pathLimitPerTask = configuration
         .getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
             OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
+    this.suspended = new AtomicBoolean(false);
   }
 
   private boolean shouldRun() {
@@ -90,7 +94,23 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
       // OzoneManager can be null for testing
       return true;
     }
-    return getOzoneManager().isLeaderReady();
+    return getOzoneManager().isLeaderReady() && !suspended.get();
+  }
+
+  /**
+   * Suspend the service.
+   */
+  @VisibleForTesting
+  public void suspend() {
+    suspended.set(true);
+  }
+
+  /**
+   * Resume the service if suspended.
+   */
+  @VisibleForTesting
+  public void resume() {
+    suspended.set(false);
   }
 
   @Override
@@ -186,16 +206,28 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
       OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
           getOzoneManager().getMetadataManager();
 
-      OmSnapshot latestSnapshot =
-          metadataManager.getLatestSnapshot(deletedDirInfo.getVolumeName(),
-              deletedDirInfo.getBucketName(), omSnapshotManager);
+      OmSnapshot latestSnapshot = metadataManager.getLatestActiveSnapshot(
+              deletedDirInfo.getVolumeName(), deletedDirInfo.getBucketName(),
+              omSnapshotManager);
 
       if (latestSnapshot != null) {
+        String dbRenameKey = metadataManager
+            .getRenameKey(deletedDirInfo.getVolumeName(),
+                deletedDirInfo.getBucketName(), deletedDirInfo.getObjectID());
         Table<String, OmDirectoryInfo> prevDirTable =
             latestSnapshot.getMetadataManager().getDirectoryTable();
+        Table<String, OmKeyInfo> prevDeletedDirTable =
+            latestSnapshot.getMetadataManager().getDeletedDirTable();
+        OmKeyInfo prevDeletedDirInfo = prevDeletedDirTable.get(key);
+        if (prevDeletedDirInfo != null) {
+          return true;
+        }
+        String prevDirTableDBKey = metadataManager.getSnapshotRenamedTable()
+            .get(dbRenameKey);
         // In OMKeyDeleteResponseWithFSO OzonePathKey is converted to
         // OzoneDeletePathKey. Changing it back to check the previous DirTable.
-        String prevDbKey = metadataManager.getOzoneDeletePathDirKey(key);
+        String prevDbKey = prevDirTableDBKey == null ?
+            metadataManager.getOzoneDeletePathDirKey(key) : prevDirTableDBKey;
         OmDirectoryInfo prevDirInfo = prevDirTable.get(prevDbKey);
         return prevDirInfo != null &&
             prevDirInfo.getObjectID() == deletedDirInfo.getObjectID();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index 69394d347b..bca9218ffd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.service;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -31,23 +32,29 @@ import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,11 +130,11 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
 
       Table<String, SnapshotInfo> snapshotInfoTable =
           ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      List<String> purgeSnapshotKeys = new ArrayList<>();
       try (TableIterator<String, ? extends Table.KeyValue
           <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
 
         long snapshotLimit = snapshotDeletionPerTask;
-        List<String> purgeSnapshotKeys = new ArrayList<>();
 
         while (iterator.hasNext() && snapshotLimit > 0) {
           SnapshotInfo snapInfo = iterator.next().getValue();
@@ -143,7 +150,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
           OmSnapshot omSnapshot = (OmSnapshot) omSnapshotManager
               .checkForSnapshot(snapInfo.getVolumeName(),
                   snapInfo.getBucketName(),
-                  getSnapshotPrefix(snapInfo.getName()));
+                  getSnapshotPrefix(snapInfo.getName()), true);
 
           Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
               omSnapshot.getMetadataManager().getDeletedTable();
@@ -173,7 +180,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
               .getBucketKey(Long.toString(volumeId),
                   Long.toString(bucketInfo.getObjectID())) + OM_KEY_PREFIX;
 
-          if (checkSnapshotReclaimable(snapshotDeletedTable,
+          if (isSnapshotReclaimable(snapshotDeletedTable,
               snapshotDeletedDirTable, snapshotBucketKey, dbBucketKeyForDir)) {
             purgeSnapshotKeys.add(snapInfo.getTableKey());
             continue;
@@ -191,7 +198,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
             omPreviousSnapshot = (OmSnapshot) omSnapshotManager
                 .checkForSnapshot(previousSnapshot.getVolumeName(),
                     previousSnapshot.getBucketName(),
-                    getSnapshotPrefix(previousSnapshot.getName()));
+                    getSnapshotPrefix(previousSnapshot.getName()), true);
 
             previousKeyTable = omPreviousSnapshot
                 
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
@@ -287,15 +294,15 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
           submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList,
               toNextDBList, renamedList, dirsToMove);
         }
-        submitSnapshotPurgeRequest(purgeSnapshotKeys);
       } catch (IOException e) {
         LOG.error("Error while running Snapshot Deleting Service", e);
       }
+      submitSnapshotPurgeRequest(purgeSnapshotKeys);
 
       return BackgroundTaskResult.EmptyTaskResult.newResult();
     }
 
-    private boolean checkSnapshotReclaimable(
+    private boolean isSnapshotReclaimable(
         Table<String, RepeatedOmKeyInfo> snapshotDeletedTable,
         Table<String, OmKeyInfo> snapshotDeletedDirTable,
         String snapshotBucketKey, String dbBucketKeyForDir) throws IOException 
{
@@ -305,14 +312,18 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
       try (TableIterator<String, ? extends Table.KeyValue<String,
           RepeatedOmKeyInfo>> iterator = snapshotDeletedTable.iterator();) {
         iterator.seek(snapshotBucketKey);
-        isKeyTableCleanedUp = iterator.hasNext() && iterator.next().getKey()
+        // If the next entry doesn't start with snapshotBucketKey then
+        // deletedKeyTable is already cleaned up.
+        isKeyTableCleanedUp = !iterator.hasNext() || !iterator.next().getKey()
             .startsWith(snapshotBucketKey);
       }
 
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
                iterator = snapshotDeletedDirTable.iterator()) {
         iterator.seek(dbBucketKeyForDir);
-        isDirTableCleanedUp = iterator.hasNext() && iterator.next().getKey()
+        // If the next entry doesn't start with dbBucketKeyForDir then
+        // deletedDirTable is already cleaned up.
+        isDirTableCleanedUp = !iterator.hasNext() || !iterator.next().getKey()
             .startsWith(dbBucketKeyForDir);
       }
 
@@ -347,7 +358,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
           Table.KeyValue<String, OmKeyInfo> deletedDir =
               deletedDirIterator.next();
 
-          if (checkDirReclaimable(deletedDir, previousDirTable,
+          if (isDirReclaimable(deletedDir, previousDirTable,
               renamedTable, renamedList)) {
             // Reclaim here
             PurgePathRequest request = prepareDeleteDirRequest(
@@ -393,9 +404,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
             .setClientId(clientId.toString())
             .build();
 
-        // TODO: [SNAPSHOT] Submit request once KeyDeletingService,
-        //  DirectoryDeletingService for snapshots are modified.
-        // submitRequest(omRequest);
+        submitRequest(omRequest);
       }
     }
 
@@ -407,19 +416,19 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
         Table<String, String> renamedTable,
         OmBucketInfo bucketInfo, long volumeId) throws IOException {
 
-      if (checkKeyReclaimable(previousKeyTable, renamedTable,
+      if (isKeyReclaimable(previousKeyTable, renamedTable,
           keyInfo, bucketInfo, volumeId, renamedKey)) {
-        // Move to next non deleted snapshot's deleted table
-        toNextDb.addKeyInfos(keyInfo.getProtobuf(
-            ClientVersion.CURRENT_VERSION));
-      } else {
         // Update in current db's deletedKeyTable
         toReclaim.addKeyInfos(keyInfo
             .getProtobuf(ClientVersion.CURRENT_VERSION));
+      } else {
+        // Move to next non deleted snapshot's deleted table
+        toNextDb.addKeyInfos(keyInfo.getProtobuf(
+            ClientVersion.CURRENT_VERSION));
       }
     }
 
-    private boolean checkDirReclaimable(
+    private boolean isDirReclaimable(
         Table.KeyValue<String, OmKeyInfo> deletedDir,
         Table<String, OmDirectoryInfo> previousDirTable,
         Table<String, String> renamedTable,
@@ -465,7 +474,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
       return prevDirectoryInfo.getObjectID() != deletedDirInfo.getObjectID();
     }
 
-    private boolean checkKeyReclaimable(
+    private boolean isKeyReclaimable(
         Table<String, OmKeyInfo> previousKeyTable,
         Table<String, String> renamedTable,
         OmKeyInfo deletedKeyInfo, OmBucketInfo bucketInfo,
@@ -475,12 +484,12 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
       String dbKey;
       // Handle case when the deleted snapshot is the first snapshot.
       if (previousKeyTable == null) {
-        return false;
+        return true;
       }
 
       // These are uncommitted blocks wrapped into a pseudo KeyInfo
       if (deletedKeyInfo.getObjectID() == OBJECT_ID_RECLAIM_BLOCKS) {
-        return false;
+        return true;
       }
 
       // Construct keyTable or fileTable DB key depending on the bucket type
@@ -522,10 +531,10 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
           .get(renamedKey) : previousKeyTable.get(dbKey);
 
       if (prevKeyInfo == null) {
-        return false;
+        return true;
       }
 
-      return prevKeyInfo.getObjectID() == deletedKeyInfo.getObjectID();
+      return prevKeyInfo.getObjectID() != deletedKeyInfo.getObjectID();
     }
 
     private SnapshotInfo getPreviousSnapshot(SnapshotInfo snapInfo)
@@ -539,6 +548,59 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
       }
       return null;
     }
+
+    public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
+        List<SnapshotMoveKeyInfos> toReclaimList,
+        List<SnapshotMoveKeyInfos> toNextDBList,
+        List<HddsProtos.KeyValue> renamedList,
+        List<String> dirsToMove) throws InterruptedException {
+
+      SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder =
+          SnapshotMoveDeletedKeysRequest.newBuilder()
+              .setFromSnapshot(snapInfo.getProtobuf());
+
+      SnapshotMoveDeletedKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
+          .addAllReclaimKeys(toReclaimList)
+          .addAllNextDBKeys(toNextDBList)
+          .addAllRenamedKeys(renamedList)
+          .addAllDeletedDirsToMove(dirsToMove)
+          .build();
+
+      OMRequest omRequest = OMRequest.newBuilder()
+          .setCmdType(Type.SnapshotMoveDeletedKeys)
+          .setSnapshotMoveDeletedKeysRequest(moveDeletedKeys)
+          .setClientId(clientId.toString())
+          .build();
+
+      try (BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock()) 
{
+        submitRequest(omRequest);
+      }
+    }
+
+    public void submitRequest(OMRequest omRequest) {
+      try {
+        if (isRatisEnabled()) {
+          OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
+
+          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+              .setClientId(clientId)
+              .setServerId(server.getRaftPeerId())
+              .setGroupId(server.getRaftGroupId())
+              .setCallId(getRunCount().get())
+              .setMessage(Message.valueOf(
+                  OMRatisHelper.convertRequestToByteString(omRequest)))
+              .setType(RaftClientRequest.writeRequestType())
+              .build();
+
+          server.submitRequest(omRequest, raftClientRequest);
+        } else {
+          ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
+        }
+      } catch (ServiceException e) {
+        LOG.error("Snapshot Deleting request failed. " +
+            "Will retry at next run.", e);
+      }
+    }
   }
 
   @Override
@@ -556,7 +618,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
    * Suspend the service.
    */
   @VisibleForTesting
-  void suspend() {
+  public void suspend() {
     suspended.set(true);
   }
 
@@ -564,7 +626,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
    * Resume the service if suspended.
    */
   @VisibleForTesting
-  void resume() {
+  public void resume() {
     suspended.set(false);
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 5e38ad33a2..69a4adcee8 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -634,8 +634,8 @@ public class SnapshotDiffManager implements AutoCloseable {
     SnapshotInfo toSnapInfo = getSnapshotInfo(ozoneManager, volumeName,
         bucketName, toSnapshotName);
 
-    checkSnapshotActive(fromSnapInfo);
-    checkSnapshotActive(toSnapInfo);
+    checkSnapshotActive(fromSnapInfo, false);
+    checkSnapshotActive(toSnapInfo, false);
   }
 
   private void generateSnapshotDiffReport(final String jobKey,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
index 096218585d..48ba7cd87e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
@@ -23,14 +23,13 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus;
-import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
-
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 
@@ -88,6 +87,18 @@ public final class SnapshotUtils {
     }
   }
 
+  /**
+   * Throws OMException FILE_NOT_FOUND if snapshot directory does not exist.
+   * @param checkpoint Snapshot checkpoint directory
+   */
+  public static void checkSnapshotDirExist(File checkpoint)
+      throws IOException {
+    if (!checkpoint.exists()) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot checkpoint directory '" + checkpoint.getAbsolutePath() +
+          "' does not exists.", FILE_NOT_FOUND);
+    }
+  }
 
   /**
    * Throws OMException FILE_NOT_FOUND if snapshot is not in active status.
@@ -96,42 +107,18 @@ public final class SnapshotUtils {
   public static void checkSnapshotActive(OzoneManager ozoneManager,
                                          String snapshotTableKey)
       throws IOException {
-    checkSnapshotActive(getSnapshotInfo(ozoneManager, snapshotTableKey));
+    checkSnapshotActive(getSnapshotInfo(ozoneManager, snapshotTableKey), 
false);
   }
 
-  public static void checkSnapshotActive(SnapshotInfo snapInfo)
+  public static void checkSnapshotActive(SnapshotInfo snapInfo,
+                                         boolean skipCheck)
       throws OMException {
 
-    if (snapInfo.getSnapshotStatus() != SnapshotStatus.SNAPSHOT_ACTIVE) {
-      if (isCalledFromSnapshotDeletingService()) {
-        LOG.debug("Permitting {} to load snapshot {} even in status: {}",
-            SnapshotDeletingService.class.getSimpleName(),
-            snapInfo.getTableKey(),
-            snapInfo.getSnapshotStatus());
-      } else {
-        throw new OMException("Unable to load snapshot. " +
-            "Snapshot with table key '" + snapInfo.getTableKey() +
-            "' is no longer active", FILE_NOT_FOUND);
-      }
-    }
-  }
-
-  /**
-   * Helper method to check whether the loader is called from
-   * SnapshotDeletingTask (return true) or not (return false).
-   */
-  private static boolean isCalledFromSnapshotDeletingService() {
-
-    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-    for (StackTraceElement elem : stackTrace) {
-      // Allow as long as loader is called from SDS. e.g. SnapshotDeletingTask
-      if (elem.getClassName().startsWith(
-          SnapshotDeletingService.class.getName())) {
-        return true;
-      }
+    if (!skipCheck &&
+        snapInfo.getSnapshotStatus() != SnapshotStatus.SNAPSHOT_ACTIVE) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot with table key '" + snapInfo.getTableKey() +
+          "' is no longer active", FILE_NOT_FOUND);
     }
-
-    return false;
   }
-
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index bbc2dd6b5a..ad8ce91c71 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -139,7 +139,7 @@ public class TestOmSnapshotManager {
     OmSnapshotManager omSnapshotManager = om.getOmSnapshotManager();
     OmSnapshot firstSnapshot = (OmSnapshot) omSnapshotManager
         .checkForSnapshot(first.getVolumeName(),
-        first.getBucketName(), getSnapshotPrefix(first.getName()));
+        first.getBucketName(), getSnapshotPrefix(first.getName()), false);
     DBStore firstSnapshotStore = mock(DBStore.class);
     HddsWhiteboxTestUtils.setInternalState(
         firstSnapshot.getMetadataManager(), "store", firstSnapshotStore);
@@ -154,7 +154,7 @@ public class TestOmSnapshotManager {
     // read in second snapshot to evict first
     omSnapshotManager
         .checkForSnapshot(second.getVolumeName(),
-        second.getBucketName(), getSnapshotPrefix(second.getName()));
+        second.getBucketName(), getSnapshotPrefix(second.getName()), false);
 
     // As a workaround, invalidate all cache entries in order to trigger
     // instances close in this test case, since JVM GC most likely would not
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index 19df5dee21..7035f3ddd9 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -214,7 +214,7 @@ public class TestOMKeyPurgeRequestAndResponse extends 
TestOMKeyRequest {
 
     OmSnapshot omSnapshot = (OmSnapshot) ozoneManager.getOmSnapshotManager()
         .checkForSnapshot(volumeName, bucketName,
-            getSnapshotPrefix("snap1"));
+            getSnapshotPrefix("snap1"), true);
 
     // The keys should be present in the snapshot's deletedTable
     for (String deletedKey : deletedKeyNames) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java
deleted file mode 100644
index 39cbdd2766..0000000000
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.ozone.om.service;
-
-import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
-import org.apache.hadoop.ozone.om.KeyManager;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmSnapshot;
-import org.apache.hadoop.ozone.om.OmTestManagers;
-import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
-import org.apache.hadoop.test.PathUtils;
-import org.apache.ozone.test.GenericTestUtils;
-import org.apache.ratis.util.ExitUtils;
-import org.junit.Rule;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
-
-/**
- * Test Snapshot Deleting Service.
- */
-public class TestSnapshotDeletingService {
-  @Rule
-  public TemporaryFolder folder = new TemporaryFolder();
-  private OzoneManagerProtocol writeClient;
-  private OzoneManager om;
-
-  private KeyManager keyManager;
-  private OMMetadataManager omMetadataManager;
-  private OzoneConfiguration conf;
-  private OmTestManagers omTestManagers;
-  private static final String VOLUME_NAME = "vol1";
-  private static final String BUCKET_NAME_ONE = "bucket1";
-  private static final String BUCKET_NAME_TWO = "bucket2";
-
-
-  @BeforeAll
-  public static void setup() {
-    ExitUtils.disableSystemExit();
-  }
-
-  @BeforeEach
-  public void createConfAndInitValues() throws Exception {
-    conf = new OzoneConfiguration();
-    File testDir = PathUtils.getTestDir(TestSnapshotDeletingService.class);
-    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
-    ServerUtils.setOzoneMetaDirPath(conf, testDir.getPath());
-    conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
-        1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT,
-        100000, TimeUnit.MILLISECONDS);
-    conf.setQuietMode(false);
-    // Enable filesystem snapshot feature for the test regardless of the 
default
-    conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
-    omTestManagers = new OmTestManagers(conf);
-    keyManager = omTestManagers.getKeyManager();
-    omMetadataManager = omTestManagers.getMetadataManager();
-    writeClient = omTestManagers.getWriteClient();
-    om = omTestManagers.getOzoneManager();
-  }
-
-  @AfterEach
-  public void cleanup() throws Exception {
-    if (om != null) {
-      om.stop();
-    }
-  }
-
-  @Test
-  @Disabled("HDDS-7974")
-  public void testSnapshotKeySpaceReclaim() throws Exception {
-    SnapshotDeletingService snapshotDeletingService = (SnapshotDeletingService)
-        keyManager.getSnapshotDeletingService();
-    KeyDeletingService deletingService = (KeyDeletingService)
-        keyManager.getDeletingService();
-
-    // Suspending SnapshotDeletingService
-    snapshotDeletingService.suspend();
-    createSnapshotDataForBucket1();
-    snapshotDeletingService.resume();
-
-    deletingService.start();
-    GenericTestUtils.waitFor(() ->
-            deletingService.getRunCount().get() >= 1,
-        1000, 10000);
-
-    GenericTestUtils.waitFor(() ->
-            snapshotDeletingService.getSuccessfulRunCount() >= 1,
-        1000, 10000);
-
-    OmSnapshot bucket1snap3 = (OmSnapshot) om.getOmSnapshotManager()
-        .checkForSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
-            getSnapshotPrefix("bucket1snap3"));
-
-    // Check bucket1key1 added to next non deleted snapshot db.
-    RepeatedOmKeyInfo omKeyInfo =
-        bucket1snap3.getMetadataManager()
-            .getDeletedTable().get("/vol1/bucket1/bucket1key1");
-    Assertions.assertNotNull(omKeyInfo);
-
-    // Check bucket1key2 not in active DB. As the key is updated
-    // in bucket1snap2
-    RepeatedOmKeyInfo omKeyInfo1 = omMetadataManager
-        .getDeletedTable().get("/vol1/bucket1/bucket1key2");
-    Assertions.assertNull(omKeyInfo1);
-    deletingService.shutdown();
-  }
-
-  @Test
-  @Disabled("HDDS-7974")
-  public void testMultipleSnapshotKeyReclaim() throws Exception {
-
-    SnapshotDeletingService snapshotDeletingService = (SnapshotDeletingService)
-        keyManager.getSnapshotDeletingService();
-    KeyDeletingService deletingService = (KeyDeletingService)
-        keyManager.getDeletingService();
-
-    // Suspending SnapshotDeletingService
-    snapshotDeletingService.suspend();
-    int snapshotCount = createSnapshotDataForBucket1();
-
-    OmKeyArgs bucket2key1 = createVolumeBucketKey(VOLUME_NAME, BUCKET_NAME_TWO,
-        BucketLayout.DEFAULT, "bucket2key1");
-
-    OmKeyArgs bucket2key2 = createKey(VOLUME_NAME, BUCKET_NAME_TWO,
-        "bucket2key2");
-
-    createSnapshot(VOLUME_NAME, BUCKET_NAME_TWO, "bucket2snap1",
-        ++snapshotCount);
-
-    // Both key 1 and key 2 can be reclaimed when Snapshot 1 is deleted.
-    writeClient.deleteKey(bucket2key1);
-    writeClient.deleteKey(bucket2key2);
-
-    createSnapshot(VOLUME_NAME, BUCKET_NAME_TWO, "bucket2snap2",
-        ++snapshotCount);
-
-    String snapshotKey2 = "/vol1/bucket2/bucket2snap1";
-    SnapshotInfo snapshotInfo = om.getMetadataManager()
-        .getSnapshotInfoTable().get(snapshotKey2);
-
-    snapshotInfo
-        .setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
-    om.getMetadataManager()
-        .getSnapshotInfoTable().put(snapshotKey2, snapshotInfo);
-    snapshotInfo = om.getMetadataManager()
-        .getSnapshotInfoTable().get(snapshotKey2);
-    Assertions.assertEquals(snapshotInfo.getSnapshotStatus(),
-        SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
-
-    snapshotDeletingService.resume();
-
-    deletingService.start();
-    GenericTestUtils.waitFor(() ->
-            deletingService.getRunCount().get() >= 1,
-        1000, 10000);
-
-    GenericTestUtils.waitFor(() ->
-            snapshotDeletingService.getSuccessfulRunCount() >= 1,
-        1000, 10000);
-
-    // Check bucket2key1 added active db as it can be reclaimed.
-    RepeatedOmKeyInfo omKeyInfo1 = omMetadataManager
-        .getDeletedTable().get("/vol1/bucket2/bucket2key1");
-
-    // Check bucket2key2 added active db as it can be reclaimed.
-    RepeatedOmKeyInfo omKeyInfo2 = omMetadataManager
-        .getDeletedTable().get("/vol1/bucket2/bucket2key2");
-
-    //TODO: [SNAPSHOT] Check this shouldn't be null when KeyDeletingService
-    // is modified for Snapshot
-    Assertions.assertNull(omKeyInfo1);
-    Assertions.assertNull(omKeyInfo2);
-    deletingService.shutdown();
-  }
-
-  private OmKeyArgs createVolumeBucketKey(String volumeName, String bucketName,
-      BucketLayout bucketLayout, String keyName) throws IOException {
-    // cheat here, just create a volume and bucket entry so that we can
-    // create the keys, we put the same data for key and value since the
-    // system does not decode the object
-    OMRequestTestUtils.addVolumeToOM(omMetadataManager,
-        OmVolumeArgs.newBuilder()
-            .setOwnerName("owner")
-            .setAdminName("admin")
-            .setVolume(volumeName)
-            .build());
-
-    OMRequestTestUtils.addBucketToOM(omMetadataManager,
-        OmBucketInfo.newBuilder().setVolumeName(volumeName)
-            .setBucketName(bucketName)
-            .setBucketLayout(bucketLayout)
-            .build());
-
-    return createKey(volumeName, bucketName, keyName);
-  }
-
-
-  private int createSnapshotDataForBucket1() throws Exception {
-    int snapshotCount = 0;
-    OmKeyArgs bucket1key1 = createVolumeBucketKey(VOLUME_NAME, BUCKET_NAME_ONE,
-        BucketLayout.DEFAULT, "bucket1key1");
-
-    createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE, "bucket1snap1",
-        ++snapshotCount);
-
-    OmKeyArgs bucket1key2 = createKey(VOLUME_NAME, BUCKET_NAME_ONE,
-        "bucket1key2");
-
-    // Key 1 cannot be reclaimed as it is still referenced by Snapshot 1.
-    writeClient.deleteKey(bucket1key1);
-    // Key 2 is deleted here, which means we can reclaim
-    // it when snapshot 2 is deleted.
-    writeClient.deleteKey(bucket1key2);
-
-    createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE, "bucket1snap2",
-        ++snapshotCount);
-    createKey(VOLUME_NAME, BUCKET_NAME_ONE, "bucket1key4");
-    OmKeyArgs bucket1key5 = createKey(VOLUME_NAME, BUCKET_NAME_ONE,
-        "bucket1key5");
-    writeClient.deleteKey(bucket1key5);
-
-    createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE, "bucket1snap3",
-        ++snapshotCount);
-
-    String snapshotKey2 = "/vol1/bucket1/bucket1snap2";
-    SnapshotInfo snapshotInfo = om.getMetadataManager()
-        .getSnapshotInfoTable().get(snapshotKey2);
-
-    snapshotInfo
-        .setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
-    om.getMetadataManager()
-        .getSnapshotInfoTable().put(snapshotKey2, snapshotInfo);
-    snapshotInfo = om.getMetadataManager()
-        .getSnapshotInfoTable().get(snapshotKey2);
-    Assertions.assertEquals(snapshotInfo.getSnapshotStatus(),
-        SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
-    return snapshotCount;
-  }
-
-  private OmKeyArgs createKey(String volumeName, String bucketName,
-       String keyName) throws IOException {
-    OmKeyArgs keyArg =
-        new OmKeyArgs.Builder()
-            .setVolumeName(volumeName)
-            .setBucketName(bucketName)
-            .setKeyName(keyName)
-            .setAcls(Collections.emptyList())
-            .setReplicationConfig(StandaloneReplicationConfig.getInstance(
-                HddsProtos.ReplicationFactor.ONE))
-            .setLocationInfoList(new ArrayList<>())
-            .build();
-
-    // Open and write the key.
-    OpenKeySession session = writeClient.openKey(keyArg);
-    writeClient.commitKey(keyArg, session.getId());
-
-    return keyArg;
-  }
-
-  private void createSnapshot(String volName, String bucketName,
-       String snapName, int count) throws Exception {
-    writeClient.createSnapshot(volName, bucketName, snapName);
-
-    GenericTestUtils.waitFor(() -> {
-      try {
-        return omMetadataManager.countRowsInTable(
-            omMetadataManager.getSnapshotInfoTable()) >= count;
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-      return false;
-    }, 1000, 10000);
-  }
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to