This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b8468ba8e HDDS-11411. Snapshot garbage collection should not run when 
the keys are moved from a deleted snapshot to the next snapshot in the chain 
(#7193)
1b8468ba8e is described below

commit 1b8468ba8e1d896994cc0772f70353d1aa439224
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Tue Sep 24 17:32:44 2024 -0700

    HDDS-11411. Snapshot garbage collection should not run when the keys are 
moved from a deleted snapshot to the next snapshot in the chain (#7193)
---
 .../common/src/main/resources/ozone-default.xml    |   9 ++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   2 +
 .../ozone/TestDirectoryDeletingServiceWithFSO.java | 140 ++++++++++++++++++
 ...TestSnapshotDeletingServiceIntegrationTest.java |   6 +-
 .../TestSnapshotDirectoryCleaningService.java      |   2 +
 .../src/main/proto/OmClientProtocol.proto          |   8 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |   8 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  96 ++++--------
 .../key/OMDirectoriesPurgeRequestWithFSO.java      |  41 ++++--
 .../ozone/om/request/key/OMKeyPurgeRequest.java    |  56 ++++---
 .../snapshot/OMSnapshotMoveDeletedKeysRequest.java |   4 +-
 .../request/snapshot/OMSnapshotPurgeRequest.java   |   6 +-
 .../key/OMDirectoriesPurgeResponseWithFSO.java     |   4 +
 .../om/service/AbstractKeyDeletingService.java     |  49 +++----
 .../ozone/om/service/DirectoryDeletingService.java |  40 ++++-
 .../ozone/om/service/KeyDeletingService.java       |  41 ++++--
 .../service/SnapshotDirectoryCleaningService.java  |   9 +-
 .../hadoop/ozone/om/snapshot/SnapshotUtils.java    |  71 ++++++++-
 .../ozone/om/service/TestKeyDeletingService.java   | 162 ++++++++++++++++++---
 19 files changed, 580 insertions(+), 174 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index fd601e1a7d..9b0ff0e962 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3743,6 +3743,15 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.snapshot.deep.cleaning.enabled</name>
+    <value>false</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>
+      Flag to enable/disable snapshot deep cleaning.
+    </description>
+  </property>
+
   <property>
     <name>ozone.scm.event.ContainerReport.thread.pool.size</name>
     <value>10</value>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 46becc9e64..a77bc4f530 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -403,6 +403,8 @@ public final class OMConfigKeys {
   /**
    * Configuration properties for Snapshot Directory Service.
    */
+  public static final String OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED = 
"ozone.snapshot.deep.cleaning.enabled";
+  public static final boolean OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT = 
false;
   public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL =
       "ozone.snapshot.directory.service.interval";
   public static final String OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
index 0abfb13365..8d161dedeb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -32,10 +36,16 @@ import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
@@ -48,12 +58,16 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +78,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongSupplier;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -72,6 +88,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
 
 /**
  * Directory deletion service test cases.
@@ -97,6 +119,7 @@ public class TestDirectoryDeletingServiceWithFSO {
     conf.setInt(OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK, 5);
     conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1000, 
TimeUnit.MILLISECONDS);
     conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, omRatisEnabled);
     conf.setBoolean(OZONE_ACL_ENABLED, true);
     cluster = MiniOzoneCluster.newBuilder(conf)
@@ -460,6 +483,123 @@ public class TestDirectoryDeletingServiceWithFSO {
     assertEquals(prevDeletedKeyCount + 5, currentDeletedKeyCount);
   }
 
+  private void createFileKey(OzoneBucket bucket, String key)
+      throws Exception {
+    byte[] value = RandomStringUtils.randomAscii(10240).getBytes(UTF_8);
+    OzoneOutputStream fileKey = bucket.createKey(key, value.length);
+    fileKey.write(value);
+    fileKey.close();
+  }
+
+  /*
+   * Create key d1/k1
+   * Create snap1
+   * Rename dir1 to dir2
+   * Delete dir2
+   * Wait for KeyDeletingService to start processing deleted key k2
+   * Create snap2 by making the KeyDeletingService thread wait till snap2 is 
flushed
+   * Resume KeyDeletingService thread.
+   * Read d1 from snap1.
+   */
+  @Test
+  public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
+      throws Exception {
+    OMMetadataManager omMetadataManager = 
cluster.getOzoneManager().getMetadataManager();
+    Table<String, SnapshotInfo> snapshotInfoTable = 
omMetadataManager.getSnapshotInfoTable();
+    Table<String, OmKeyInfo> deletedDirTable = 
omMetadataManager.getDeletedDirTable();
+    Table<String, String> renameTable = 
omMetadataManager.getSnapshotRenamedTable();
+    
cluster.getOzoneManager().getKeyManager().getSnapshotDeletingService().shutdown();
+    DirectoryDeletingService dirDeletingService = 
cluster.getOzoneManager().getKeyManager().getDirDeletingService();
+    // Suspend KeyDeletingService
+    dirDeletingService.suspend();
+    GenericTestUtils.waitFor(() -> !dirDeletingService.isRunningOnAOS(), 1000, 
10000);
+    Random random = new Random();
+    final String testVolumeName = "volume" + random.nextInt();
+    final String testBucketName = "bucket" + random.nextInt();
+    // Create Volume and Buckets
+    ObjectStore store = client.getObjectStore();
+    store.createVolume(testVolumeName);
+    OzoneVolume volume = store.getVolume(testVolumeName);
+    volume.createBucket(testBucketName,
+        
BucketArgs.newBuilder().setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED).build());
+    OzoneBucket bucket = volume.getBucket(testBucketName);
+
+    OzoneManager ozoneManager = Mockito.spy(cluster.getOzoneManager());
+    OmSnapshotManager omSnapshotManager = 
Mockito.spy(ozoneManager.getOmSnapshotManager());
+    when(ozoneManager.getOmSnapshotManager()).thenAnswer(i -> 
omSnapshotManager);
+    DirectoryDeletingService service = Mockito.spy(new 
DirectoryDeletingService(1000, TimeUnit.MILLISECONDS, 1000,
+        ozoneManager,
+        cluster.getConf()));
+    service.shutdown();
+    final int initialSnapshotCount =
+        (int) 
cluster.getOzoneManager().getMetadataManager().countRowsInTable(snapshotInfoTable);
+    final int initialDeletedCount = (int) 
omMetadataManager.countRowsInTable(deletedDirTable);
+    final int initialRenameCount = (int) 
omMetadataManager.countRowsInTable(renameTable);
+    String snap1 = "snap1";
+    String snap2 = "snap2";
+    createFileKey(bucket, "dir1/key1");
+    store.createSnapshot(testVolumeName, testBucketName, "snap1");
+    bucket.renameKey("dir1", "dir2");
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(testVolumeName)
+        .setBucketName(testBucketName)
+        .setKeyName("dir2").build();
+    long objectId = 
store.getClientProxy().getOzoneManagerClient().getKeyInfo(omKeyArgs, false)
+        .getKeyInfo().getObjectID();
+    long volumeId = omMetadataManager.getVolumeId(testVolumeName);
+    long bucketId = omMetadataManager.getBucketId(testVolumeName, 
testBucketName);
+    String deletePathKey = omMetadataManager.getOzoneDeletePathKey(objectId,
+        omMetadataManager.getOzonePathKey(volumeId,
+        bucketId, bucketId, "dir2"));
+    bucket.deleteDirectory("dir2", true);
+
+
+    assertTableRowCount(deletedDirTable, initialDeletedCount + 1);
+    assertTableRowCount(renameTable, initialRenameCount + 1);
+    Mockito.doAnswer(i -> {
+      List<OzoneManagerProtocolProtos.PurgePathRequest> purgePathRequestList = 
i.getArgument(5);
+      for (OzoneManagerProtocolProtos.PurgePathRequest purgeRequest : 
purgePathRequestList) {
+        Assertions.assertNotEquals(deletePathKey, 
purgeRequest.getDeletedDir());
+      }
+      return i.callRealMethod();
+    }).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(), 
anyLong(),
+        anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(), 
Mockito.any(), any());
+
+    Mockito.doAnswer(i -> {
+      store.createSnapshot(testVolumeName, testBucketName, snap2);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          SnapshotInfo snapshotInfo = 
store.getClientProxy().getOzoneManagerClient()
+              .getSnapshotInfo(testVolumeName, testBucketName, snap2);
+
+          return 
OmSnapshotManager.areSnapshotChangesFlushedToDB(cluster.getOzoneManager().getMetadataManager(),
+              snapshotInfo);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }, 1000, 100000);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return 
renameTable.get(omMetadataManager.getRenameKey(testVolumeName, testBucketName, 
objectId)) == null;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }, 1000, 10000);
+      return i.callRealMethod();
+    
}).when(omSnapshotManager).getSnapshot(ArgumentMatchers.eq(testVolumeName), 
ArgumentMatchers.eq(testBucketName),
+        ArgumentMatchers.eq(snap1));
+    assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1);
+    service.runPeriodicalTaskNow();
+    service.runPeriodicalTaskNow();
+    assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2);
+    store.deleteSnapshot(testVolumeName, testBucketName, snap2);
+    service.runPeriodicalTaskNow();
+    store.deleteSnapshot(testVolumeName, testBucketName, snap1);
+    cluster.restartOzoneManager();
+    
assertTableRowCount(cluster.getOzoneManager().getMetadataManager().getSnapshotInfoTable(),
 initialSnapshotCount);
+    dirDeletingService.resume();
+  }
+
   @Test
   public void testDirDeletedTableCleanUpForSnapshot() throws Exception {
     Table<String, OmKeyInfo> deletedDirTable =
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
index d77f9bf9d8..254de072e0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
@@ -87,6 +87,7 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -130,6 +131,7 @@ public class TestSnapshotDeletingServiceIntegrationTest {
         1, StorageUnit.MB);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
         500, TimeUnit.MILLISECONDS);
+    conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT,
         10000, TimeUnit.MILLISECONDS);
     conf.setInt(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 500);
@@ -447,7 +449,7 @@ public class TestSnapshotDeletingServiceIntegrationTest {
       while (iterator.hasNext()) {
         Table.KeyValue<String, RepeatedOmKeyInfo> next = iterator.next();
         String activeDBDeletedKey = next.getKey();
-        if (activeDBDeletedKey.matches(".*/key1.*")) {
+        if (activeDBDeletedKey.matches(".*/key1/.*")) {
           RepeatedOmKeyInfo activeDBDeleted = next.getValue();
           OMMetadataManager metadataManager =
               cluster.getOzoneManager().getMetadataManager();
@@ -508,7 +510,7 @@ public class TestSnapshotDeletingServiceIntegrationTest {
     when(ozoneManager.getKeyManager()).thenReturn(keyManager);
     KeyDeletingService keyDeletingService = Mockito.spy(new 
KeyDeletingService(ozoneManager,
         ozoneManager.getScmClient().getBlockClient(), keyManager, 10000,
-        100000, cluster.getConf()));
+        100000, cluster.getConf(), false));
     keyDeletingService.shutdown();
     GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 
1000,
         100000);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
index 03df331087..3be0725a00 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -79,6 +80,7 @@ public class TestSnapshotDirectoryCleaningService {
   public static void init() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL, 2500);
+    conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true);
     conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 2500,
         TimeUnit.MILLISECONDS);
     conf.setBoolean(OZONE_ACL_ENABLED, true);
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 126adbdc51..dd54b7205e 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1381,6 +1381,8 @@ message PurgeKeysRequest {
     // if set, will purge keys in a snapshot DB instead of active DB
     optional string snapshotTableKey = 2;
     repeated SnapshotMoveKeyInfos keysToUpdate = 3;
+    // previous snapshotID can also be null & this field would be absent in 
older requests.
+    optional NullableUUID expectedPreviousSnapshotID = 4;
 }
 
 message PurgeKeysResponse {
@@ -1403,6 +1405,12 @@ message PurgePathsResponse {
 message PurgeDirectoriesRequest {
   repeated PurgePathRequest deletedPath = 1;
   optional string snapshotTableKey = 2;
+  // previous snapshotID can also be null & this field would be absent in 
older requests.
+  optional NullableUUID expectedPreviousSnapshotID = 3;
+}
+
+message NullableUUID {
+  optional hadoop.hdds.UUID uuid = 1;
 }
 
 message PurgeDirectoriesResponse {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index e3f56f2dea..9bdbc70fb9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -143,6 +143,8 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT;
@@ -230,6 +232,8 @@ public class KeyManagerImpl implements KeyManager {
 
   @Override
   public void start(OzoneConfiguration configuration) {
+    boolean isSnapshotDeepCleaningEnabled = 
configuration.getBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED,
+        OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT);
     if (keyDeletingService == null) {
       long blockDeleteInterval = configuration.getTimeDuration(
           OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
@@ -241,7 +245,7 @@ public class KeyManagerImpl implements KeyManager {
           TimeUnit.MILLISECONDS);
       keyDeletingService = new KeyDeletingService(ozoneManager,
           scmClient.getBlockClient(), this, blockDeleteInterval,
-          serviceTimeout, configuration);
+          serviceTimeout, configuration, isSnapshotDeepCleaningEnabled);
       keyDeletingService.start();
     }
 
@@ -314,7 +318,7 @@ public class KeyManagerImpl implements KeyManager {
       }
     }
 
-    if (snapshotDirectoryCleaningService == null &&
+    if (isSnapshotDeepCleaningEnabled && snapshotDirectoryCleaningService == 
null &&
         ozoneManager.isFilesystemSnapshotEnabled()) {
       long dirDeleteInterval = configuration.getTimeDuration(
           OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 2e85fef162..de201fd5d4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -34,7 +34,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -1611,11 +1610,22 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
           String[] keySplit = kv.getKey().split(OM_KEY_PREFIX);
           String bucketKey = getBucketKey(keySplit[1], keySplit[2]);
           OmBucketInfo bucketInfo = getBucketTable().get(bucketKey);
-
+          // If Bucket deleted bucketInfo would be null, thus making previous 
snapshot also null.
+          SnapshotInfo previousSnapshotInfo = bucketInfo == null ? null :
+              SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(),
+              bucketInfo.getBucketName(), ozoneManager, snapshotChainManager);
+          // previous snapshot is not active or it has not been flushed to 
disk then don't process the key in this
+          // iteration.
+          if (previousSnapshotInfo != null &&
+              (previousSnapshotInfo.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
+              
!OmSnapshotManager.areSnapshotChangesFlushedToDB(ozoneManager.getMetadataManager(),
+                  previousSnapshotInfo))) {
+            continue;
+          }
           // Get the latest snapshot in snapshot path.
-          try (ReferenceCounted<OmSnapshot>
-              rcLatestSnapshot = getLatestActiveSnapshot(
-                  keySplit[1], keySplit[2], omSnapshotManager)) {
+          try (ReferenceCounted<OmSnapshot> rcLatestSnapshot = 
previousSnapshotInfo == null ? null :
+              
omSnapshotManager.getSnapshot(previousSnapshotInfo.getVolumeName(),
+                  previousSnapshotInfo.getBucketName(), 
previousSnapshotInfo.getName())) {
 
             // Multiple keys with the same path can be queued in one DB entry
             RepeatedOmKeyInfo infoList = kv.getValue();
@@ -1692,17 +1702,24 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
 
             List<OmKeyInfo> notReclaimableKeyInfoList =
                 notReclaimableKeyInfo.getOmKeyInfoList();
+            // If Bucket deleted bucketInfo would be null, thus making 
previous snapshot also null.
+            SnapshotInfo newPreviousSnapshotInfo = bucketInfo == null ? null :
+                SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(),
+                bucketInfo.getBucketName(), ozoneManager, 
snapshotChainManager);
+            // Check if the previous snapshot in the chain hasn't changed.
+            if 
(Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo).map(SnapshotInfo::getSnapshotId),
+                
Optional.ofNullable(previousSnapshotInfo).map(SnapshotInfo::getSnapshotId))) {
+              // If all the versions are not reclaimable, then do nothing.
+              if (notReclaimableKeyInfoList.size() > 0 &&
+                  notReclaimableKeyInfoList.size() !=
+                      infoList.getOmKeyInfoList().size()) {
+                keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
+              }
 
-            // If all the versions are not reclaimable, then do nothing.
-            if (notReclaimableKeyInfoList.size() > 0 &&
-                notReclaimableKeyInfoList.size() !=
-                    infoList.getOmKeyInfoList().size()) {
-              keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
-            }
-
-            if (notReclaimableKeyInfoList.size() !=
-                infoList.getOmKeyInfoList().size()) {
-              keyBlocksList.addAll(blockGroupList);
+              if (notReclaimableKeyInfoList.size() !=
+                  infoList.getOmKeyInfoList().size()) {
+                keyBlocksList.addAll(blockGroupList);
+              }
             }
           }
         }
@@ -1719,55 +1736,6 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
         delOmKeyInfo != null;
   }
 
-  /**
-   * Get the latest OmSnapshot for a snapshot path.
-   */
-  public ReferenceCounted<OmSnapshot> getLatestActiveSnapshot(
-          String volumeName, String bucketName,
-          OmSnapshotManager snapshotManager)
-      throws IOException {
-
-    String snapshotPath = volumeName + OM_KEY_PREFIX + bucketName;
-    Optional<UUID> latestPathSnapshot = Optional.ofNullable(
-        snapshotChainManager.getLatestPathSnapshotId(snapshotPath));
-
-    Optional<SnapshotInfo> snapshotInfo = Optional.empty();
-
-    while (latestPathSnapshot.isPresent()) {
-      Optional<String> snapTableKey = latestPathSnapshot
-          .map(uuid -> snapshotChainManager.getTableKey(uuid));
-
-      snapshotInfo = snapTableKey.isPresent() ?
-          Optional.ofNullable(getSnapshotInfoTable().get(snapTableKey.get())) :
-          Optional.empty();
-
-      if (snapshotInfo.isPresent() && snapshotInfo.get().getSnapshotStatus() ==
-          SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
-        break;
-      }
-
-      // Update latestPathSnapshot if current snapshot is deleted.
-      if (snapshotChainManager.hasPreviousPathSnapshot(snapshotPath,
-          latestPathSnapshot.get())) {
-        latestPathSnapshot = Optional.ofNullable(snapshotChainManager
-            .previousPathSnapshot(snapshotPath, latestPathSnapshot.get()));
-      } else {
-        latestPathSnapshot = Optional.empty();
-      }
-    }
-
-    Optional<ReferenceCounted<OmSnapshot>> rcOmSnapshot =
-        snapshotInfo.isPresent() ?
-            Optional.ofNullable(
-                snapshotManager.getSnapshot(volumeName,
-                    bucketName,
-                    snapshotInfo.get().getName())
-            ) :
-            Optional.empty();
-
-    return rcOmSnapshot.orElse(null);
-  }
-
   /**
    * Decide whether the open key is a multipart upload related key.
    * @param openKeyInfo open key related to multipart upload
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index dd08ff1716..29ed5d9fc7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -24,15 +24,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -45,8 +47,10 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
+import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
 import static org.apache.hadoop.ozone.OzoneConsts.DELETED_HSYNC_KEY;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.validatePreviousSnapshotId;
 
 /**
  * Handles purging of keys from OM DB.
@@ -66,19 +70,34 @@ public class OMDirectoriesPurgeRequestWithFSO extends 
OMKeyRequest {
 
     List<OzoneManagerProtocolProtos.PurgePathRequest> purgeRequests =
             purgeDirsRequest.getDeletedPathList();
-
-    SnapshotInfo fromSnapshotInfo = null;
     Set<Pair<String, String>> lockSet = new HashSet<>();
     Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap = new HashMap<>();
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) 
ozoneManager.getMetadataManager();
     Map<String, OmKeyInfo> openKeyInfoMap = new HashMap<>();
-
     OMMetrics omMetrics = ozoneManager.getMetrics();
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+    final SnapshotInfo fromSnapshotInfo;
     try {
-      if (fromSnapshot != null) {
-        fromSnapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
fromSnapshot);
+      fromSnapshotInfo = fromSnapshot != null ? 
SnapshotUtils.getSnapshotInfo(ozoneManager,
+          fromSnapshot) : null;
+      // Checking if this request is an old request or new one.
+      if (purgeDirsRequest.hasExpectedPreviousSnapshotID()) {
+        // Validating previous snapshot since while purging deletes, a 
snapshot create request could make this purge
+        // directory request invalid on AOS since the deletedDirectory would 
be in the newly created snapshot. Adding
+        // subdirectories could lead to not being able to reclaim sub-files 
and subdirectories since the
+        // file/directory would be present in the newly created snapshot.
+        // Validating previous snapshot can ensure the chain hasn't changed.
+        UUID expectedPreviousSnapshotId = 
purgeDirsRequest.getExpectedPreviousSnapshotID().hasUuid()
+            ? 
fromProtobuf(purgeDirsRequest.getExpectedPreviousSnapshotID().getUuid()) : null;
+        validatePreviousSnapshotId(fromSnapshotInfo, 
omMetadataManager.getSnapshotChainManager(),
+            expectedPreviousSnapshotId);
       }
-
+    } catch (IOException e) {
+      LOG.error("Error occurred while performing OMDirectoriesPurge. ", e);
+      return new 
OMDirectoriesPurgeResponseWithFSO(createErrorOMResponse(omResponse, e));
+    }
+    try {
       for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
         for (OzoneManagerProtocolProtos.KeyInfo key :
             path.getMarkDeletedSubDirsList()) {
@@ -170,12 +189,8 @@ public class OMDirectoriesPurgeRequestWithFSO extends 
OMKeyRequest {
       }
     }
 
-    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
-        getOmRequest());
-    OMClientResponse omClientResponse = new OMDirectoriesPurgeResponseWithFSO(
+    return new OMDirectoriesPurgeResponseWithFSO(
         omResponse.build(), purgeRequests, ozoneManager.isRatisEnabled(),
             getBucketLayout(), volBucketInfoMap, fromSnapshotInfo, 
openKeyInfoMap);
-
-    return omClientResponse;
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index 14c80bb7a9..a5e8cb1452 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -42,6 +43,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.validatePreviousSnapshotId;
 
 /**
  * Handles purging of keys from OM DB.
@@ -58,30 +63,44 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
   @Override
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, 
TermIndex termIndex) {
     PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest();
-    List<DeletedKeys> bucketDeletedKeysList = purgeKeysRequest
-        .getDeletedKeysList();
-    List<SnapshotMoveKeyInfos> keysToUpdateList = purgeKeysRequest
-        .getKeysToUpdateList();
-    String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ?
-        purgeKeysRequest.getSnapshotTableKey() : null;
-    List<String> keysToBePurgedList = new ArrayList<>();
+    List<DeletedKeys> bucketDeletedKeysList = 
purgeKeysRequest.getDeletedKeysList();
+    List<SnapshotMoveKeyInfos> keysToUpdateList = 
purgeKeysRequest.getKeysToUpdateList();
+    String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ? 
purgeKeysRequest.getSnapshotTableKey() : null;
     OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) 
ozoneManager.getMetadataManager();
 
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
-    OMClientResponse omClientResponse = null;
 
-    for (DeletedKeys bucketWithDeleteKeys : bucketDeletedKeysList) {
-      for (String deletedKey : bucketWithDeleteKeys.getKeysList()) {
-        keysToBePurgedList.add(deletedKey);
+
+    final SnapshotInfo fromSnapshotInfo;
+    try {
+      fromSnapshotInfo = fromSnapshot != null ? 
SnapshotUtils.getSnapshotInfo(ozoneManager,
+          fromSnapshot) : null;
+      // Checking if this request is an old request or new one.
+      if (purgeKeysRequest.hasExpectedPreviousSnapshotID()) {
+        // Validating previous snapshot since while purging deletes, a 
snapshot create request could make this purge
+        // key request invalid on AOS since the deletedKey would be in the 
newly created snapshot. This would add an
+        // redundant tombstone entry in the deletedTable. It is better to skip 
the transaction.
+        UUID expectedPreviousSnapshotId = 
purgeKeysRequest.getExpectedPreviousSnapshotID().hasUuid()
+            ? 
fromProtobuf(purgeKeysRequest.getExpectedPreviousSnapshotID().getUuid()) : null;
+        validatePreviousSnapshotId(fromSnapshotInfo, 
omMetadataManager.getSnapshotChainManager(),
+            expectedPreviousSnapshotId);
       }
+    } catch (IOException e) {
+      LOG.error("Error occurred while performing OmKeyPurge. ", e);
+      return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, e));
     }
-    final SnapshotInfo fromSnapshotInfo;
 
-    try {
-      fromSnapshotInfo = fromSnapshot == null ? null : 
SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot);
-    } catch (IOException ex) {
-      return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, ex));
+    List<String> keysToBePurgedList = new ArrayList<>();
+
+    for (DeletedKeys bucketWithDeleteKeys : bucketDeletedKeysList) {
+      keysToBePurgedList.addAll(bucketWithDeleteKeys.getKeysList());
+    }
+
+    if (keysToBePurgedList.isEmpty()) {
+      return new OMKeyPurgeResponse(createErrorOMResponse(omResponse,
+          new OMException("None of the keys can be purged be purged since a 
new snapshot was created for all the " +
+              "buckets, making this request invalid", 
OMException.ResultCodes.KEY_DELETION_ERROR)));
     }
 
     // Setting transaction info for snapshot, this is to prevent duplicate 
purge requests to OM from background
@@ -95,10 +114,9 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
     } catch (IOException e) {
       return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, e));
     }
-    omClientResponse = new OMKeyPurgeResponse(omResponse.build(), 
keysToBePurgedList, fromSnapshotInfo,
-        keysToUpdateList);
 
-    return omClientResponse;
+    return new OMKeyPurgeResponse(omResponse.build(),
+        keysToBePurgedList, fromSnapshotInfo, keysToUpdateList);
   }
 
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
index 2ddf308bb5..18055bdda4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
@@ -80,9 +80,9 @@ public class OMSnapshotMoveDeletedKeysRequest extends 
OMClientRequest {
         OmResponseUtil.getOMResponseBuilder(getOmRequest());
     try {
       // Check the snapshot exists.
-      SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot.getTableKey());
+      SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
fromSnapshot.getTableKey());
 
-      nextSnapshot = SnapshotUtils.getNextSnapshot(ozoneManager, 
snapshotChainManager, fromSnapshot);
+      nextSnapshot = SnapshotUtils.getNextSnapshot(ozoneManager, 
snapshotChainManager, snapshotInfo);
 
       // Get next non-deleted snapshot.
       List<SnapshotMoveKeyInfos> nextDBKeysList = 
moveDeletedKeysRequest.getNextDBKeysList();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
index ca29d4e112..38c51d4de5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
@@ -103,9 +103,7 @@ public class OMSnapshotPurgeRequest extends OMClientRequest 
{
               "Snapshot purge request.", snapTableKey);
           continue;
         }
-
-        SnapshotInfo nextSnapshot =
-            SnapshotUtils.getNextSnapshot(ozoneManager, snapshotChainManager, 
fromSnapshot);
+        SnapshotInfo nextSnapshot = 
SnapshotUtils.getNextSnapshot(ozoneManager, snapshotChainManager, fromSnapshot);
 
         // Step 1: Update the deep clean flag for the next active snapshot
         updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, 
trxnLogIndex);
@@ -116,7 +114,7 @@ public class OMSnapshotPurgeRequest extends OMClientRequest 
{
             .addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), 
CacheValue.get(trxnLogIndex));
         updatedSnapshotInfos.remove(fromSnapshot.getTableKey());
       }
-
+      // Update the snapshotInfo lastTransactionInfo.
       for (SnapshotInfo snapshotInfo : updatedSnapshotInfos.values()) {
         
snapshotInfo.setLastTransactionInfo(TransactionInfo.valueOf(termIndex).toByteString());
         omMetadataManager.getSnapshotInfoTable().addCacheEntry(new 
CacheKey<>(snapshotInfo.getTableKey()),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
index 28c3e3d758..782063d324 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
@@ -78,6 +78,10 @@ public class OMDirectoriesPurgeResponseWithFSO extends 
OmKeyResponse {
     this.openKeyInfoMap = openKeyInfoMap;
   }
 
+  public OMDirectoriesPurgeResponseWithFSO(OMResponse omResponse) {
+    super(omResponse);
+  }
+
   @Override
   public void addToDBBatch(OMMetadataManager metadataManager,
       BatchOperation batchOp) throws IOException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 2c2d16bf14..0d3a05c9e4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.service;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -32,13 +33,11 @@ import 
org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -97,7 +96,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
   protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
       KeyManager manager,
       HashMap<String, RepeatedOmKeyInfo> keysToModify,
-      String snapTableKey) throws IOException {
+      String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException 
{
 
     long startTime = Time.monotonicNow();
     int delCount = 0;
@@ -120,7 +119,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
       startTime = Time.monotonicNow();
       if (isRatisEnabled()) {
         delCount = submitPurgeKeysRequest(blockDeletionResults,
-            keysToModify, snapTableKey);
+            keysToModify, snapTableKey, expectedPreviousSnapshotId);
       } else {
         // TODO: Once HA and non-HA paths are merged, we should have
         //  only one code path here. Purge keys should go through an
@@ -172,7 +171,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
    * @param keysToModify Updated list of RepeatedOmKeyInfo
    */
   private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
-      HashMap<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey) {
+      HashMap<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey, 
UUID expectedPreviousSnapshotId) {
     Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
         new HashMap<>();
 
@@ -203,6 +202,12 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     if (snapTableKey != null) {
       purgeKeysRequest.setSnapshotTableKey(snapTableKey);
     }
+    OzoneManagerProtocolProtos.NullableUUID.Builder 
expectedPreviousSnapshotNullableUUID =
+        OzoneManagerProtocolProtos.NullableUUID.newBuilder();
+    if (expectedPreviousSnapshotId != null) {
+      
expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId));
+    }
+    
purgeKeysRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
 
     // Add keys to PurgeKeysRequest bucket wise.
     for (Map.Entry<Pair<String, String>, List<String>> entry :
@@ -274,13 +279,21 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
   }
 
   protected void submitPurgePaths(List<PurgePathRequest> requests,
-                                  String snapTableKey) {
+                                  String snapTableKey,
+                                  UUID expectedPreviousSnapshotId) {
     OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest 
=
         OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
 
     if (snapTableKey != null) {
       purgeDirRequest.setSnapshotTableKey(snapTableKey);
     }
+    OzoneManagerProtocolProtos.NullableUUID.Builder 
expectedPreviousSnapshotNullableUUID =
+        OzoneManagerProtocolProtos.NullableUUID.newBuilder();
+    if (expectedPreviousSnapshotId != null) {
+      
expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId));
+    }
+    
purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
+
     purgeDirRequest.addAllDeletedPath(requests);
 
     OzoneManagerProtocolProtos.OMRequest omRequest =
@@ -386,7 +399,8 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
       List<Pair<String, OmKeyInfo>> allSubDirList,
       List<PurgePathRequest> purgePathRequestList,
       String snapTableKey, long startTime,
-      int remainingBufLimit, KeyManager keyManager) {
+      int remainingBufLimit, KeyManager keyManager,
+      UUID expectedPreviousSnapshotId) {
 
     // Optimization to handle delete sub-dir and keys to remove quickly
     // This case will be useful to handle when depth of directory is high
@@ -426,7 +440,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     }
 
     if (!purgePathRequestList.isEmpty()) {
-      submitPurgePaths(purgePathRequestList, snapTableKey);
+      submitPurgePaths(purgePathRequestList, snapTableKey, 
expectedPreviousSnapshotId);
     }
 
     if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -549,25 +563,6 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     return cLimit + increment >= maxLimit;
   }
 
-  protected SnapshotInfo getPreviousActiveSnapshot(SnapshotInfo snapInfo, 
SnapshotChainManager chainManager)
-      throws IOException {
-    SnapshotInfo currSnapInfo = snapInfo;
-    while (chainManager.hasPreviousPathSnapshot(
-        currSnapInfo.getSnapshotPath(), currSnapInfo.getSnapshotId())) {
-
-      UUID prevPathSnapshot = chainManager.previousPathSnapshot(
-          currSnapInfo.getSnapshotPath(), currSnapInfo.getSnapshotId());
-      String tableKey = chainManager.getTableKey(prevPathSnapshot);
-      SnapshotInfo prevSnapInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
tableKey);
-      if (prevSnapInfo.getSnapshotStatus() ==
-          SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
-        return prevSnapInfo;
-      }
-      currSnapInfo = prevSnapInfo;
-    }
-    return null;
-  }
-
   protected boolean isKeyReclaimable(
       Table<String, OmKeyInfo> previousKeyTable,
       Table<String, String> renamedTable,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index ad16c49d5e..e638ce45a4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
@@ -43,6 +45,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -167,9 +172,15 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
             = new ArrayList<>((int) remainNum);
 
         Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;
+
         try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
                  deleteTableIterator = getOzoneManager().getMetadataManager().
             getDeletedDirTable().iterator()) {
+          // This is to avoid race condition b/w purge request and snapshot 
chain updation. For AOS taking the global
+          // snapshotId since AOS could process multiple buckets in one 
iteration.
+          UUID expectedPreviousSnapshotId =
+              
((OmMetadataManagerImpl)getOzoneManager().getMetadataManager()).getSnapshotChainManager()
+                  .getLatestGlobalSnapshotId();
 
           long startTime = Time.monotonicNow();
           while (remainNum > 0 && deleteTableIterator.hasNext()) {
@@ -216,7 +227,7 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
               remainNum, dirNum, subDirNum, subFileNum,
               allSubDirList, purgePathRequestList, null, startTime,
               ratisByteLimit - consumedSize,
-              getOzoneManager().getKeyManager());
+              getOzoneManager().getKeyManager(), expectedPreviousSnapshotId);
 
         } catch (IOException e) {
           LOG.error("Error while running delete directories and files " +
@@ -239,12 +250,23 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
           getOzoneManager().getOmSnapshotManager();
       OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
           getOzoneManager().getMetadataManager();
-
+      SnapshotInfo previousSnapshotInfo = 
SnapshotUtils.getLatestSnapshotInfo(deletedDirInfo.getVolumeName(),
+          deletedDirInfo.getBucketName(), getOzoneManager(), 
metadataManager.getSnapshotChainManager());
+      if (previousSnapshotInfo == null) {
+        return false;
+      }
+      // previous snapshot is not active or it has not been flushed to disk 
then don't process the key in this
+      // iteration.
+      if (previousSnapshotInfo.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
+              
!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
+                  previousSnapshotInfo)) {
+        return true;
+      }
       try (ReferenceCounted<OmSnapshot> rcLatestSnapshot =
-          metadataManager.getLatestActiveSnapshot(
+          omSnapshotManager.getSnapshot(
               deletedDirInfo.getVolumeName(),
               deletedDirInfo.getBucketName(),
-              omSnapshotManager)) {
+              previousSnapshotInfo.getName())) {
 
         if (rcLatestSnapshot != null) {
           String dbRenameKey = metadataManager
@@ -265,8 +287,14 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
           String prevDbKey = prevDirTableDBKey == null ?
               metadataManager.getOzoneDeletePathDirKey(key) : 
prevDirTableDBKey;
           OmDirectoryInfo prevDirInfo = prevDirTable.get(prevDbKey);
-          return prevDirInfo != null &&
-              prevDirInfo.getObjectID() == deletedDirInfo.getObjectID();
+          //Checking if the previous snapshot in the chain hasn't changed 
while checking if the deleted directory is
+          // present in the previous snapshot. If the chain has changed, the 
deleted directory could have been moved
+          // to the newly created snapshot.
+          SnapshotInfo newPreviousSnapshotInfo = 
SnapshotUtils.getLatestSnapshotInfo(deletedDirInfo.getVolumeName(),
+              deletedDirInfo.getBucketName(), getOzoneManager(), 
metadataManager.getSnapshotChainManager());
+          return 
(!Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo).map(SnapshotInfo::getSnapshotId),
+              
Optional.ofNullable(previousSnapshotInfo).map(SnapshotInfo::getSnapshotId))) || 
(prevDirInfo != null &&
+              prevDirInfo.getObjectID() == deletedDirInfo.getObjectID());
         }
       }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index e7553004ed..9a4f74eba5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -23,7 +23,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -44,6 +46,7 @@ import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
@@ -93,11 +96,14 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
   private final Set<String> completedExclusiveSizeSet;
   private final Map<String, String> snapshotSeekMap;
   private AtomicBoolean isRunningOnAOS;
+  private final boolean deepCleanSnapshots;
+  private final SnapshotChainManager snapshotChainManager;
 
   public KeyDeletingService(OzoneManager ozoneManager,
       ScmBlockLocationProtocol scmClient,
       KeyManager manager, long serviceInterval,
-      long serviceTimeout, ConfigurationSource conf) {
+      long serviceTimeout, ConfigurationSource conf,
+      boolean deepCleanSnapshots) {
     super(KeyDeletingService.class.getSimpleName(), serviceInterval,
         TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE,
         serviceTimeout, ozoneManager, scmClient);
@@ -113,6 +119,8 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
     this.completedExclusiveSizeSet = new HashSet<>();
     this.snapshotSeekMap = new HashMap<>();
     this.isRunningOnAOS = new AtomicBoolean(false);
+    this.deepCleanSnapshots = deepCleanSnapshots;
+    this.snapshotChainManager = 
((OmMetadataManagerImpl)manager.getMetadataManager()).getSnapshotChainManager();
   }
 
   /**
@@ -202,7 +210,9 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
           //  doesn't have enough entries left.
           //  OM would have to keep track of which snapshot the key is coming
           //  from if the above would be done inside getPendingDeletionKeys().
-
+          // This is to avoid race condition b/w purge request and snapshot 
chain update. For AOS taking the global
+          // snapshotId since AOS could process multiple buckets in one 
iteration.
+          UUID expectedPreviousSnapshotId = 
snapshotChainManager.getLatestGlobalSnapshotId();
           PendingKeysDeletion pendingKeysDeletion = manager
               .getPendingDeletionKeys(getKeyLimitPerTask());
           List<BlockGroup> keyBlocksList = pendingKeysDeletion
@@ -210,7 +220,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
           if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
             delCount = processKeyDeletes(keyBlocksList,
                 getOzoneManager().getKeyManager(),
-                pendingKeysDeletion.getKeysToModify(), null);
+                pendingKeysDeletion.getKeysToModify(), null, 
expectedPreviousSnapshotId);
             deletedKeyCount.addAndGet(delCount);
           }
         } catch (IOException e) {
@@ -219,7 +229,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
         }
 
         try {
-          if (delCount < keyLimitPerTask) {
+          if (deepCleanSnapshots && delCount < keyLimitPerTask) {
             processSnapshotDeepClean(delCount);
           }
         } catch (Exception e) {
@@ -255,15 +265,23 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
         while (delCount < keyLimitPerTask && iterator.hasNext()) {
           List<BlockGroup> keysToPurge = new ArrayList<>();
           HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
-          SnapshotInfo currSnapInfo = iterator.next().getValue();
-
+          SnapshotInfo currSnapInfo = 
snapshotInfoTable.get(iterator.next().getKey());
           // Deep clean only on active snapshot. Deleted Snapshots will be
           // cleaned up by SnapshotDeletingService.
-          if (currSnapInfo.getSnapshotStatus() != SNAPSHOT_ACTIVE ||
+          if (currSnapInfo == null || currSnapInfo.getSnapshotStatus() != 
SNAPSHOT_ACTIVE ||
               currSnapInfo.getDeepClean()) {
             continue;
           }
 
+          SnapshotInfo prevSnapInfo = 
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
+              currSnapInfo);
+          if (prevSnapInfo != null &&
+              (prevSnapInfo.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
+                  
!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
+                      prevSnapInfo))) {
+            continue;
+          }
+
           try (ReferenceCounted<OmSnapshot>
               rcCurrOmSnapshot = omSnapshotManager.getSnapshot(
                   currSnapInfo.getVolumeName(),
@@ -292,11 +310,13 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
             }
 
             String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-            SnapshotInfo previousSnapshot = 
getPreviousActiveSnapshot(currSnapInfo, snapChainManager);
+            SnapshotInfo previousSnapshot = 
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
+                currSnapInfo);
             SnapshotInfo previousToPrevSnapshot = null;
 
             if (previousSnapshot != null) {
-              previousToPrevSnapshot = 
getPreviousActiveSnapshot(previousSnapshot, snapChainManager);
+              previousToPrevSnapshot = 
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
+                  previousSnapshot);
             }
 
             Table<String, OmKeyInfo> previousKeyTable = null;
@@ -425,7 +445,8 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
 
               if (!keysToPurge.isEmpty()) {
                 processKeyDeletes(keysToPurge, currOmSnapshot.getKeyManager(),
-                    keysToModify, currSnapInfo.getTableKey());
+                    keysToModify, currSnapInfo.getTableKey(),
+                    
Optional.ofNullable(previousSnapshot).map(SnapshotInfo::getSnapshotId).orElse(null));
               }
             } finally {
               IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
index 26d5d24a8a..e7133e6258 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
@@ -61,6 +61,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.getDirectoryInfo;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getOzonePathKeyForFso;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getPreviousSnapshot;
 
 /**
  * Snapshot BG Service for deleted directory deep clean and exclusive size
@@ -143,11 +144,11 @@ public class SnapshotDirectoryCleaningService
           <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
 
         while (iterator.hasNext()) {
-          SnapshotInfo currSnapInfo = iterator.next().getValue();
+          SnapshotInfo currSnapInfo = 
snapshotInfoTable.get(iterator.next().getKey());
 
           // Expand deleted dirs only on active snapshot. Deleted Snapshots
           // will be cleaned up by SnapshotDeletingService.
-          if (currSnapInfo.getSnapshotStatus() != SNAPSHOT_ACTIVE ||
+          if (currSnapInfo == null || currSnapInfo.getSnapshotStatus() != 
SNAPSHOT_ACTIVE ||
               currSnapInfo.getDeepCleanedDeletedDir()) {
             continue;
           }
@@ -173,7 +174,7 @@ public class SnapshotDirectoryCleaningService
                   "unexpected state.");
             }
 
-            SnapshotInfo previousSnapshot = 
getPreviousActiveSnapshot(currSnapInfo, snapChainManager);
+            SnapshotInfo previousSnapshot = 
getPreviousSnapshot(getOzoneManager(), snapChainManager, currSnapInfo);
             SnapshotInfo previousToPrevSnapshot = null;
 
             Table<String, OmKeyInfo> previousKeyTable = null;
@@ -190,7 +191,7 @@ public class SnapshotDirectoryCleaningService
                   .getKeyTable(bucketInfo.getBucketLayout());
               prevRenamedTable = omPreviousSnapshot
                   .getMetadataManager().getSnapshotRenamedTable();
-              previousToPrevSnapshot = 
getPreviousActiveSnapshot(previousSnapshot, snapChainManager);
+              previousToPrevSnapshot = getPreviousSnapshot(getOzoneManager(), 
snapChainManager, previousSnapshot);
             }
 
             Table<String, OmKeyInfo> previousToPrevKeyTable = null;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
index 7af6d08513..87983b0726 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
@@ -41,6 +41,8 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
@@ -150,7 +152,6 @@ public final class SnapshotUtils {
     }
   }
 
-
   /**
    * Get the next snapshot in the snapshot chain.
    */
@@ -162,7 +163,8 @@ public final class SnapshotUtils {
     // SnapshotChainManager might throw NoSuchElementException as the snapshot
     // is removed in-memory but OMDoubleBuffer has not flushed yet.
     if (snapInfo == null) {
-      throw new OMException("Snapshot Info is null. Cannot get the next 
snapshot", INVALID_SNAPSHOT_ERROR);
+      throw new OMException("Provided Snapshot Info argument is null. Cannot 
get the next snapshot for a null value",
+          INVALID_SNAPSHOT_ERROR);
     }
     try {
       if (chainManager.hasNextPathSnapshot(snapInfo.getSnapshotPath(),
@@ -178,6 +180,41 @@ public final class SnapshotUtils {
     return null;
   }
 
+  /**
+   * Get the previous snapshot in the snapshot chain.
+   */
+  public static SnapshotInfo getPreviousSnapshot(OzoneManager ozoneManager,
+                                                 SnapshotChainManager 
chainManager,
+                                                 SnapshotInfo snapInfo)
+      throws IOException {
+    UUID previousSnapshotId = getPreviousSnapshotId(snapInfo, chainManager);
+    return previousSnapshotId == null ? null : getSnapshotInfo(ozoneManager, 
chainManager, previousSnapshotId);
+  }
+
+  /**
+   * Get the previous snapshot in the snapshot chain.
+   */
+  private static UUID getPreviousSnapshotId(SnapshotInfo snapInfo, 
SnapshotChainManager chainManager)
+      throws IOException {
+    // If the snapshot is deleted in the previous run, then the in-memory
+    // SnapshotChainManager might throw NoSuchElementException as the snapshot
+    // is removed in-memory but OMDoubleBuffer has not flushed yet.
+    if (snapInfo == null) {
+      throw new OMException("Provided Snapshot Info argument is null. Cannot 
get the previous snapshot for a null " +
+          "value", INVALID_SNAPSHOT_ERROR);
+    }
+    try {
+      if (chainManager.hasPreviousPathSnapshot(snapInfo.getSnapshotPath(),
+          snapInfo.getSnapshotId())) {
+        return chainManager.previousPathSnapshot(snapInfo.getSnapshotPath(),
+            snapInfo.getSnapshotId());
+      }
+    } catch (NoSuchElementException ignored) {
+
+    }
+    return null;
+  }
+
   /**
    * Return a map column family to prefix for the keys in the table for
    * the given volume and bucket.
@@ -282,4 +319,34 @@ public final class SnapshotUtils {
     }
     return false;
   }
+
+  public static SnapshotInfo getLatestSnapshotInfo(String volumeName, String 
bucketName,
+                                                   OzoneManager ozoneManager,
+                                                   SnapshotChainManager 
snapshotChainManager) throws IOException {
+    Optional<UUID> latestPathSnapshot = Optional.ofNullable(
+        getLatestPathSnapshotId(volumeName, bucketName, snapshotChainManager));
+    return latestPathSnapshot.isPresent() ?
+        getSnapshotInfo(ozoneManager, snapshotChainManager, 
latestPathSnapshot.get()) : null;
+  }
+
+  public static UUID getLatestPathSnapshotId(String volumeName, String 
bucketName,
+                                             SnapshotChainManager 
snapshotChainManager) throws IOException {
+    String snapshotPath = volumeName + OM_KEY_PREFIX + bucketName;
+    return snapshotChainManager.getLatestPathSnapshotId(snapshotPath);
+  }
+
+  // Validates the previous path snapshotId for given a snapshotInfo. In case 
snapshotInfo is
+  // null, the snapshotInfo would be considered as AOS and previous snapshot 
becomes the latest snapshot in the global
+  // snapshot chain. Would throw OMException if validation fails otherwise 
function would pass.
+  public static void validatePreviousSnapshotId(SnapshotInfo snapshotInfo,
+                                                SnapshotChainManager 
snapshotChainManager,
+                                                UUID 
expectedPreviousSnapshotId) throws IOException {
+    UUID previousSnapshotId = snapshotInfo == null ? 
snapshotChainManager.getLatestGlobalSnapshotId() :
+        SnapshotUtils.getPreviousSnapshotId(snapshotInfo, 
snapshotChainManager);
+    if (!Objects.equals(expectedPreviousSnapshotId, previousSnapshotId)) {
+      throw new OMException("Snapshot validation failed. Expected previous 
snapshotId : " +
+          expectedPreviousSnapshotId + " but was " + previousSnapshotId,
+          OMException.ResultCodes.INVALID_REQUEST);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 8163592cfc..ff6506da03 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -39,13 +39,17 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.KeyManagerImpl;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.PendingKeysDeletion;
 import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -57,10 +61,13 @@ import org.apache.ozone.test.OzoneTestBase;
 import org.apache.ratis.util.ExitUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,12 +88,16 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERV
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -132,6 +143,7 @@ class TestKeyDeletingService extends OzoneTestBase {
         1, TimeUnit.SECONDS);
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL,
         200, TimeUnit.MILLISECONDS);
+    conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true);
     conf.setQuietMode(false);
   }
 
@@ -285,6 +297,115 @@ class TestKeyDeletingService extends OzoneTestBase {
       assertEquals(0, rangeKVs.size());
     }
 
+    /*
+     * Create key k1
+     * Create snap1
+     * Rename k1 to k2
+     * Delete k2
+     * Wait for KeyDeletingService to start processing deleted key k2
+     * Create snap2 by making the KeyDeletingService thread wait till snap2 is 
flushed
+     * Resume KeyDeletingService thread.
+     * Read k1 from snap1.
+     */
+    @Test
+    public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
+        throws Exception {
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          om.getMetadataManager().getSnapshotInfoTable();
+      Table<String, RepeatedOmKeyInfo> deletedTable =
+          om.getMetadataManager().getDeletedTable();
+      Table<String, String> renameTable = 
om.getMetadataManager().getSnapshotRenamedTable();
+
+      // Suspend KeyDeletingService
+      keyDeletingService.suspend();
+      SnapshotDeletingService snapshotDeletingService = 
om.getKeyManager().getSnapshotDeletingService();
+      snapshotDeletingService.suspend();
+      GenericTestUtils.waitFor(() -> !keyDeletingService.isRunningOnAOS(), 
1000, 10000);
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      OzoneManager ozoneManager = Mockito.spy(om);
+      OmSnapshotManager omSnapshotManager = 
Mockito.spy(om.getOmSnapshotManager());
+      KeyManager km = Mockito.spy(new KeyManagerImpl(ozoneManager, 
ozoneManager.getScmClient(), conf,
+          om.getPerfMetrics()));
+      when(ozoneManager.getOmSnapshotManager()).thenAnswer(i -> {
+        return omSnapshotManager;
+      });
+      KeyDeletingService service = new KeyDeletingService(ozoneManager, 
scmBlockTestingClient, km, 10000,
+          100000, conf, false);
+      service.shutdown();
+      final long initialSnapshotCount = 
metadataManager.countRowsInTable(snapshotInfoTable);
+      final long initialDeletedCount = 
metadataManager.countRowsInTable(deletedTable);
+      final long initialRenameCount = 
metadataManager.countRowsInTable(renameTable);
+      // Create Volume and Buckets
+      createVolumeAndBucket(volumeName, bucketName, false);
+      OmKeyArgs args = createAndCommitKey(volumeName, bucketName,
+          "key1", 3);
+      String snap1 = uniqueObjectName("snap");
+      String snap2 = uniqueObjectName("snap");
+      writeClient.createSnapshot(volumeName, bucketName, snap1);
+      KeyInfoWithVolumeContext keyInfo = writeClient.getKeyInfo(args, false);
+      AtomicLong objectId = new AtomicLong(keyInfo.getKeyInfo().getObjectID());
+      renameKey(volumeName, bucketName, "key1", "key2");
+      deleteKey(volumeName, bucketName, "key2");
+      assertTableRowCount(deletedTable, initialDeletedCount + 1, 
metadataManager);
+      assertTableRowCount(renameTable, initialRenameCount + 1, 
metadataManager);
+
+      String[] deletePathKey = 
{metadataManager.getOzoneDeletePathKey(objectId.get(),
+          metadataManager.getOzoneKey(volumeName,
+          bucketName, "key2"))};
+      assertNotNull(deletedTable.get(deletePathKey[0]));
+      Mockito.doAnswer(i -> {
+        writeClient.createSnapshot(volumeName, bucketName, snap2);
+        GenericTestUtils.waitFor(() -> {
+          try {
+            SnapshotInfo snapshotInfo = 
writeClient.getSnapshotInfo(volumeName, bucketName, snap2);
+            return 
OmSnapshotManager.areSnapshotChangesFlushedToDB(metadataManager, snapshotInfo);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }, 1000, 100000);
+        GenericTestUtils.waitFor(() -> {
+          try {
+            return renameTable.get(metadataManager.getRenameKey(volumeName, 
bucketName, objectId.get())) == null;
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }, 1000, 10000);
+        return i.callRealMethod();
+      }).when(omSnapshotManager).getSnapshot(ArgumentMatchers.eq(volumeName), 
ArgumentMatchers.eq(bucketName),
+          ArgumentMatchers.eq(snap1));
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, 
metadataManager);
+      doAnswer(i -> {
+        PendingKeysDeletion pendingKeysDeletion = (PendingKeysDeletion) 
i.callRealMethod();
+        for (BlockGroup group : pendingKeysDeletion.getKeyBlocksList()) {
+          Assertions.assertNotEquals(deletePathKey[0], group.getGroupID());
+        }
+        return pendingKeysDeletion;
+      }).when(km).getPendingDeletionKeys(anyInt());
+      service.runPeriodicalTaskNow();
+      service.runPeriodicalTaskNow();
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, 
metadataManager);
+      // Create Key3
+      OmKeyArgs args2 = createAndCommitKey(volumeName, bucketName,
+          "key3", 3);
+      keyInfo = writeClient.getKeyInfo(args2, false);
+      objectId.set(keyInfo.getKeyInfo().getObjectID());
+      // Rename Key3 to key4
+      renameKey(volumeName, bucketName, "key3", "key4");
+      // Delete Key4
+      deleteKey(volumeName, bucketName, "key4");
+      deletePathKey[0] = metadataManager.getOzoneDeletePathKey(objectId.get(), 
metadataManager.getOzoneKey(volumeName,
+          bucketName, "key4"));
+      // Delete snapshot
+      writeClient.deleteSnapshot(volumeName, bucketName, snap2);
+      // Run KDS and ensure key4 doesn't get purged since snap2 has not been 
deleted.
+      service.runPeriodicalTaskNow();
+      writeClient.deleteSnapshot(volumeName, bucketName, snap1);
+      snapshotDeletingService.resume();
+      assertTableRowCount(snapshotInfoTable, initialSnapshotCount, 
metadataManager);
+      keyDeletingService.resume();
+    }
+
     /*
      * Create Snap1
      * Create 10 keys
@@ -396,68 +517,68 @@ class TestKeyDeletingService extends OzoneTestBase {
       final long initialDeletedCount = 
metadataManager.countRowsInTable(deletedTable);
       final long initialRenamedCount = 
metadataManager.countRowsInTable(renamedTable);
 
-      final String volumeName = getTestName();
-      final String bucketName = uniqueObjectName("bucket");
+      final String testVolumeName = getTestName();
+      final String testBucketName = uniqueObjectName("bucket");
       final String keyName = uniqueObjectName("key");
 
       // Create Volume and Buckets
-      createVolumeAndBucket(volumeName, bucketName, false);
+      createVolumeAndBucket(testVolumeName, testBucketName, false);
 
       // Create 3 keys
       for (int i = 1; i <= 3; i++) {
-        createAndCommitKey(volumeName, bucketName, keyName + i, 3);
+        createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
       }
       assertTableRowCount(keyTable, initialKeyCount + 3, metadataManager);
 
       // Create Snapshot1
       String snap1 = uniqueObjectName("snap");
-      writeClient.createSnapshot(volumeName, bucketName, snap1);
+      writeClient.createSnapshot(testVolumeName, testBucketName, snap1);
       assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, 
metadataManager);
       assertTableRowCount(deletedTable, initialDeletedCount, metadataManager);
 
       // Create 2 keys
       for (int i = 4; i <= 5; i++) {
-        createAndCommitKey(volumeName, bucketName, keyName + i, 3);
+        createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
       }
       // Delete a key, rename 2 keys. We will be using this to test
       // how we handle renamed key for exclusive size calculation.
-      renameKey(volumeName, bucketName, keyName + 1, "renamedKey1");
-      renameKey(volumeName, bucketName, keyName + 2, "renamedKey2");
-      deleteKey(volumeName, bucketName, keyName + 3);
+      renameKey(testVolumeName, testBucketName, keyName + 1, "renamedKey1");
+      renameKey(testVolumeName, testBucketName, keyName + 2, "renamedKey2");
+      deleteKey(testVolumeName, testBucketName, keyName + 3);
       assertTableRowCount(deletedTable, initialDeletedCount + 1, 
metadataManager);
       assertTableRowCount(renamedTable, initialRenamedCount + 2, 
metadataManager);
 
       // Create Snapshot2
       String snap2 = uniqueObjectName("snap");
-      writeClient.createSnapshot(volumeName, bucketName, snap2);
+      writeClient.createSnapshot(testVolumeName, testBucketName, snap2);
       assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, 
metadataManager);
       assertTableRowCount(deletedTable, initialDeletedCount, metadataManager);
 
       // Create 2 keys
       for (int i = 6; i <= 7; i++) {
-        createAndCommitKey(volumeName, bucketName, keyName + i, 3);
+        createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
       }
 
-      deleteKey(volumeName, bucketName, "renamedKey1");
-      deleteKey(volumeName, bucketName, keyName + 4);
+      deleteKey(testVolumeName, testBucketName, "renamedKey1");
+      deleteKey(testVolumeName, testBucketName, keyName + 4);
       // Do a second rename of already renamedKey2
-      renameKey(volumeName, bucketName, "renamedKey2", "renamedKey22");
+      renameKey(testVolumeName, testBucketName, "renamedKey2", "renamedKey22");
       assertTableRowCount(deletedTable, initialDeletedCount + 2, 
metadataManager);
       assertTableRowCount(renamedTable, initialRenamedCount + 1, 
metadataManager);
 
       // Create Snapshot3
       String snap3 = uniqueObjectName("snap");
-      writeClient.createSnapshot(volumeName, bucketName, snap3);
+      writeClient.createSnapshot(testVolumeName, testBucketName, snap3);
       // Delete 4 keys
-      deleteKey(volumeName, bucketName, "renamedKey22");
+      deleteKey(testVolumeName, testBucketName, "renamedKey22");
       for (int i = 5; i <= 7; i++) {
-        deleteKey(volumeName, bucketName, keyName + i);
+        deleteKey(testVolumeName, testBucketName, keyName + i);
       }
 
       // Create Snapshot4
       String snap4 = uniqueObjectName("snap");
-      writeClient.createSnapshot(volumeName, bucketName, snap4);
-      createAndCommitKey(volumeName, bucketName, uniqueObjectName("key"), 3);
+      writeClient.createSnapshot(testVolumeName, testBucketName, snap4);
+      createAndCommitKey(testVolumeName, testBucketName, 
uniqueObjectName("key"), 3);
 
       long prevKdsRunCount = getRunCount();
       keyDeletingService.resume();
@@ -468,6 +589,7 @@ class TestKeyDeletingService extends OzoneTestBase {
           .put(snap3, 2000L)
           .put(snap4, 0L)
           .build();
+      System.out.println(expectedSize);
 
       // Let KeyDeletingService to run for some iterations
       GenericTestUtils.waitFor(
@@ -480,8 +602,10 @@ class TestKeyDeletingService extends OzoneTestBase {
         while (iterator.hasNext()) {
           Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
           String snapshotName = snapshotEntry.getValue().getName();
+
           Long expected = expectedSize.getOrDefault(snapshotName, 0L);
           assertNotNull(expected);
+          System.out.println(snapshotName);
           assertEquals(expected, snapshotEntry.getValue().getExclusiveSize());
           // Since for the test we are using RATIS/THREE
           assertEquals(expected * 3, 
snapshotEntry.getValue().getExclusiveReplicatedSize());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to