This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b782054e6 HDDS-7968. [Snapshot] Improve KeyDeletingService to reclaim 
eligible key blocks in snapshot's deletedTable (#4935)
9b782054e6 is described below

commit 9b782054e6d4e237c02bf3ae7ce3a4d512e2eb24
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Thu Jun 22 17:06:21 2023 -0700

    HDDS-7968. [Snapshot] Improve KeyDeletingService to reclaim eligible key 
blocks in snapshot's deletedTable (#4935)
---
 .../hadoop/ozone/om/helpers/SnapshotInfo.java      |  37 +++-
 .../ozone/om/helpers/TestOmSnapshotInfo.java       |   2 +
 .../src/main/proto/OmClientProtocol.proto          |   2 +
 .../snapshot/OMSnapshotMoveDeletedKeysRequest.java |  31 +--
 .../request/snapshot/OMSnapshotPurgeRequest.java   |  74 ++++++-
 .../ozone/om/response/key/OMKeyPurgeResponse.java  |   1 +
 .../response/snapshot/OMSnapshotPurgeResponse.java |  27 ++-
 .../om/service/AbstractKeyDeletingService.java     |  94 +++++++++
 .../ozone/om/service/KeyDeletingService.java       | 232 ++++++++++++++++++++-
 .../ozone/om/service/SnapshotDeletingService.java  |  90 +-------
 .../hadoop/ozone/om/snapshot/SnapshotUtils.java    |  31 +++
 .../TestOMSnapshotPurgeRequestAndResponse.java     |   4 +
 .../ozone/om/service/TestKeyDeletingService.java   | 141 +++++++++++++
 13 files changed, 634 insertions(+), 132 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
index 1be0ace8aa..756c397655 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
@@ -125,6 +125,7 @@ public final class SnapshotInfo implements Auditable {
    * RocksDB's transaction sequence number at the time of checkpoint creation.
    */
   private long dbTxSequenceNumber;
+  private boolean deepClean;
 
   /**
    * Private constructor, constructed via builder.
@@ -140,6 +141,8 @@ public final class SnapshotInfo implements Auditable {
    * @param globalPreviousSnapshotId - Snapshot global previous snapshot id.
    * @param snapshotPath - Snapshot path, bucket .snapshot path.
    * @param checkpointDir - Snapshot checkpoint directory.
+   * @param dbTxSequenceNumber - RDB latest transaction sequence number.
+   * @param deepCleaned - To be deep cleaned status for snapshot.
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
   private SnapshotInfo(UUID snapshotId,
@@ -153,7 +156,8 @@ public final class SnapshotInfo implements Auditable {
                        UUID globalPreviousSnapshotId,
                        String snapshotPath,
                        String checkpointDir,
-                       long dbTxSequenceNumber) {
+                       long dbTxSequenceNumber,
+                       boolean deepCleaned) {
     this.snapshotId = snapshotId;
     this.name = name;
     this.volumeName = volumeName;
@@ -166,6 +170,7 @@ public final class SnapshotInfo implements Auditable {
     this.snapshotPath = snapshotPath;
     this.checkpointDir = checkpointDir;
     this.dbTxSequenceNumber = dbTxSequenceNumber;
+    this.deepClean = deepCleaned;
   }
 
   public void setName(String name) {
@@ -204,6 +209,14 @@ public final class SnapshotInfo implements Auditable {
     this.checkpointDir = checkpointDir;
   }
 
+  public boolean getDeepClean() {
+    return deepClean;
+  }
+
+  public void setDeepClean(boolean deepClean) {
+    this.deepClean = deepClean;
+  }
+
   public UUID getSnapshotId() {
     return snapshotId;
   }
@@ -265,7 +278,8 @@ public final class SnapshotInfo implements Auditable {
         .setPathPreviousSnapshotId(pathPreviousSnapshotId)
         .setGlobalPreviousSnapshotId(globalPreviousSnapshotId)
         .setSnapshotPath(snapshotPath)
-        .setCheckpointDir(checkpointDir);
+        .setCheckpointDir(checkpointDir)
+        .setDeepClean(deepClean);
   }
 
   /**
@@ -284,6 +298,7 @@ public final class SnapshotInfo implements Auditable {
     private String snapshotPath;
     private String checkpointDir;
     private long dbTxSequenceNumber;
+    private boolean deepClean;
 
     public Builder() {
       // default values
@@ -350,6 +365,11 @@ public final class SnapshotInfo implements Auditable {
       return this;
     }
 
+    public Builder setDeepClean(boolean deepClean) {
+      this.deepClean = deepClean;
+      return this;
+    }
+
     public SnapshotInfo build() {
       Preconditions.checkNotNull(name);
       return new SnapshotInfo(
@@ -364,7 +384,8 @@ public final class SnapshotInfo implements Auditable {
           globalPreviousSnapshotId,
           snapshotPath,
           checkpointDir,
-          dbTxSequenceNumber
+          dbTxSequenceNumber,
+          deepClean
       );
     }
   }
@@ -393,7 +414,8 @@ public final class SnapshotInfo implements Auditable {
 
     sib.setSnapshotPath(snapshotPath)
         .setCheckpointDir(checkpointDir)
-        .setDbTxSequenceNumber(dbTxSequenceNumber);
+        .setDbTxSequenceNumber(dbTxSequenceNumber)
+        .setDeepClean(deepClean);
     return sib.build();
   }
 
@@ -425,6 +447,10 @@ public final class SnapshotInfo implements Auditable {
           fromProtobuf(snapshotInfoProto.getGlobalPreviousSnapshotID()));
     }
 
+    if (snapshotInfoProto.hasDeepClean()) {
+      osib.setDeepClean(snapshotInfoProto.getDeepClean());
+    }
+
     osib.setSnapshotPath(snapshotInfoProto.getSnapshotPath())
         .setCheckpointDir(snapshotInfoProto.getCheckpointDir())
         .setDbTxSequenceNumber(snapshotInfoProto.getDbTxSequenceNumber());
@@ -509,7 +535,8 @@ public final class SnapshotInfo implements Auditable {
         .setGlobalPreviousSnapshotId(INITIAL_SNAPSHOT_ID)
         .setSnapshotPath(volumeName + OM_KEY_PREFIX + bucketName)
         .setVolumeName(volumeName)
-        .setBucketName(bucketName);
+        .setBucketName(bucketName)
+        .setDeepClean(true);
 
     if (snapshotId != null) {
       builder.setCheckpointDir(getCheckpointDirName(snapshotId));
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
index c081c11a66..2c99c992af 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
@@ -66,6 +66,7 @@ public class TestOmSnapshotInfo {
         .setSnapshotPath(SNAPSHOT_PATH)
         .setCheckpointDir(CHECKPOINT_DIR)
         .setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
+        .setDeepClean(true)
         .build();
   }
 
@@ -83,6 +84,7 @@ public class TestOmSnapshotInfo {
         .setSnapshotPath(SNAPSHOT_PATH)
         .setCheckpointDir(CHECKPOINT_DIR)
         .setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
+        .setDeepClean(true)
         .build();
   }
 
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index cb9549834f..1b50691cba 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -803,6 +803,7 @@ message SnapshotInfo {
   optional string snapshotPath = 10;
   optional string checkpointDir = 11;
   optional int64 dbTxSequenceNumber = 12;
+  optional bool deepClean = 13;
  }
 
 message SnapshotDiffJobProto {
@@ -1771,6 +1772,7 @@ message SnapshotMoveKeyInfos {
 
 message SnapshotPurgeRequest {
   repeated string snapshotDBKeys = 1;
+  repeated string updatedSnapshotDBKey = 2;
 }
 
 message DeleteTenantRequest {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
index 8d7b533f11..9eee231a69 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveDeletedKeysResponse;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
@@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.FILESYSTEM_SNAPSHOT;
@@ -86,7 +86,7 @@ public class OMSnapshotMoveDeletedKeysRequest extends 
OMClientRequest {
               fromSnapshot.getBucketName(),
               getSnapshotPrefix(fromSnapshot.getName()), true);
 
-      nextSnapshot = getNextActiveSnapshot(fromSnapshot,
+      nextSnapshot = SnapshotUtils.getNextActiveSnapshot(fromSnapshot,
           snapshotChainManager, omSnapshotManager);
 
       // Get next non-deleted snapshot.
@@ -122,32 +122,5 @@ public class OMSnapshotMoveDeletedKeysRequest extends 
OMClientRequest {
 
     return omClientResponse;
   }
-
-  /**
-   * Get the next non deleted snapshot in the snapshot chain.
-   */
-  private SnapshotInfo getNextActiveSnapshot(
-      SnapshotInfo snapInfo,
-      SnapshotChainManager chainManager,
-      OmSnapshotManager omSnapshotManager
-  ) throws IOException {
-    while (chainManager.hasNextPathSnapshot(snapInfo.getSnapshotPath(),
-        snapInfo.getSnapshotId())) {
-
-      UUID nextPathSnapshot = chainManager.nextPathSnapshot(
-              snapInfo.getSnapshotPath(), snapInfo.getSnapshotId());
-
-      String tableKey = chainManager.getTableKey(nextPathSnapshot);
-      SnapshotInfo nextSnapshotInfo =
-          omSnapshotManager.getSnapshotInfo(tableKey);
-
-      if (nextSnapshotInfo.getSnapshotStatus().equals(
-          SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE)) {
-        return nextSnapshotInfo;
-      }
-      snapInfo = nextSnapshotInfo;
-    }
-    return null;
-  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
index 20ce2cc240..414ec0f67b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
@@ -19,17 +19,27 @@
 
 package org.apache.hadoop.ozone.om.request.snapshot;
 
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Handles OMSnapshotPurge Request.
@@ -44,6 +54,11 @@ public class OMSnapshotPurgeRequest extends OMClientRequest {
   @Override
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
+        ozoneManager.getMetadataManager();
+    SnapshotChainManager snapshotChainManager =
+        omMetadataManager.getSnapshotChainManager();
 
     OMClientResponse omClientResponse = null;
 
@@ -52,14 +67,61 @@ public class OMSnapshotPurgeRequest extends OMClientRequest 
{
     SnapshotPurgeRequest snapshotPurgeRequest = getOmRequest()
         .getSnapshotPurgeRequest();
 
-    List<String> snapshotDbKeys = snapshotPurgeRequest
-        .getSnapshotDBKeysList();
+    try {
+      List<String> snapshotDbKeys = snapshotPurgeRequest
+          .getSnapshotDBKeysList();
+      List<String> snapInfosToUpdate = snapshotPurgeRequest
+          .getUpdatedSnapshotDBKeyList();
+      Map<String, SnapshotInfo> updatedSnapInfos = new HashMap<>();
 
-    omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
-        snapshotDbKeys);
-    addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
-        omDoubleBufferHelper);
+      // Snapshots that are already deepCleaned by the KeyDeletingService
+      // can be marked as deepCleaned.
+      for (String snapTableKey : snapInfosToUpdate) {
+        SnapshotInfo snapInfo = omMetadataManager.getSnapshotInfoTable()
+            .get(snapTableKey);
+
+        updateSnapshotInfoAndCache(snapInfo, omMetadataManager,
+            trxnLogIndex, updatedSnapInfos, false);
+      }
+
+      // Snapshots that are purged by the SnapshotDeletingService
+      // will update the next snapshot so that is can be deep cleaned
+      // by the KeyDeletingService in the next run.
+      for (String snapTableKey : snapshotDbKeys) {
+        SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable()
+            .get(snapTableKey);
+
+        SnapshotInfo nextSnapshot = SnapshotUtils
+            .getNextActiveSnapshot(fromSnapshot,
+                snapshotChainManager, omSnapshotManager);
+        updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager,
+            trxnLogIndex, updatedSnapInfos, true);
+      }
+
+      omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
+          snapshotDbKeys, updatedSnapInfos);
+    } catch (IOException ex) {
+      omClientResponse = new OMSnapshotPurgeResponse(
+          createErrorOMResponse(omResponse, ex));
+    } finally {
+      addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+          omDoubleBufferHelper);
+    }
 
     return omClientResponse;
   }
+
+  private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
+      OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
+      Map<String, SnapshotInfo> updatedSnapInfos, boolean deepClean) {
+    if (snapInfo != null) {
+      snapInfo.setDeepClean(deepClean);
+
+      // Update table cache first
+      omMetadataManager.getSnapshotInfoTable().addCacheEntry(
+          new CacheKey<>(snapInfo.getTableKey()),
+          CacheValue.get(trxnLogIndex, snapInfo));
+      updatedSnapInfos.put(snapInfo.getTableKey(), snapInfo);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
index d967f65732..6c89e16d6a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
@@ -73,6 +73,7 @@ public class OMKeyPurgeResponse extends OmKeyResponse {
       // Init Batch Operation for snapshot db.
       try (BatchOperation writeBatch = fromSnapshotStore.initBatchOperation()) 
{
         processKeys(writeBatch, fromSnapshot.getMetadataManager());
+        processKeysToUpdate(writeBatch, fromSnapshot.getMetadataManager());
         fromSnapshotStore.commitBatchOperation(writeBatch);
       }
     } else {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
index 6aafdd9403..be4bdacc72 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 
@@ -49,11 +50,25 @@ public class OMSnapshotPurgeResponse extends 
OMClientResponse {
   private static final Logger LOG =
       LoggerFactory.getLogger(OMSnapshotPurgeResponse.class);
   private final List<String> snapshotDbKeys;
+  private final Map<String, SnapshotInfo> updatedSnapInfos;
 
   public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse,
-      @Nonnull List<String> snapshotDbKeys) {
+      @Nonnull List<String> snapshotDbKeys,
+      Map<String, SnapshotInfo> updatedSnapInfos) {
     super(omResponse);
     this.snapshotDbKeys = snapshotDbKeys;
+    this.updatedSnapInfos = updatedSnapInfos;
+  }
+
+  /**
+   * Constructor for failed request.
+   * It should not be used for successful request.
+   */
+  public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+    this.snapshotDbKeys = null;
+    this.updatedSnapInfos = null;
   }
 
   @Override
@@ -62,6 +77,7 @@ public class OMSnapshotPurgeResponse extends OMClientResponse 
{
 
     OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
         omMetadataManager;
+    updateSnapInfo(metadataManager, batchOperation);
     for (String dbKey: snapshotDbKeys) {
       SnapshotInfo snapshotInfo = omMetadataManager
           .getSnapshotInfoTable().get(dbKey);
@@ -80,6 +96,15 @@ public class OMSnapshotPurgeResponse extends 
OMClientResponse {
     }
   }
 
+  private void updateSnapInfo(OmMetadataManagerImpl metadataManager,
+                              BatchOperation batchOp)
+      throws IOException {
+    for (Map.Entry<String, SnapshotInfo> entry : updatedSnapInfos.entrySet()) {
+      metadataManager.getSnapshotInfoTable().putWithBatch(batchOp,
+          entry.getKey(), entry.getValue());
+    }
+  }
+
   /**
    * Cleans up the snapshot chain and updates next snapshot's
    * previousPath and previousGlobal IDs.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index be950a2725..d87cb1cce0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.service;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -30,10 +31,14 @@ import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -52,11 +57,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OBJECT_ID_RECLAIM_BLOCKS;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static 
org.apache.hadoop.ozone.om.service.SnapshotDeletingService.isBlockLocationInfoSame;
 
 /**
  * Abstracts common code from KeyDeletingService and DirectoryDeletingService
@@ -437,6 +445,92 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     return remainNum;
   }
 
+  protected SnapshotInfo getPreviousActiveSnapshot(SnapshotInfo snapInfo,
+      SnapshotChainManager chainManager, OmSnapshotManager omSnapshotManager)
+      throws IOException {
+    SnapshotInfo currSnapInfo = snapInfo;
+    while (chainManager.hasPreviousPathSnapshot(
+        currSnapInfo.getSnapshotPath(), currSnapInfo.getSnapshotId())) {
+
+      UUID prevPathSnapshot = chainManager.previousPathSnapshot(
+          currSnapInfo.getSnapshotPath(), currSnapInfo.getSnapshotId());
+      String tableKey = chainManager.getTableKey(prevPathSnapshot);
+      SnapshotInfo prevSnapInfo = omSnapshotManager.getSnapshotInfo(tableKey);
+      if (prevSnapInfo.getSnapshotStatus() ==
+          SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
+        return prevSnapInfo;
+      }
+      currSnapInfo = prevSnapInfo;
+    }
+    return null;
+  }
+
+  protected boolean isKeyReclaimable(
+      Table<String, OmKeyInfo> previousKeyTable,
+      Table<String, String> renamedTable,
+      OmKeyInfo deletedKeyInfo, OmBucketInfo bucketInfo,
+      long volumeId, HddsProtos.KeyValue.Builder renamedKeyBuilder)
+      throws IOException {
+
+    String dbKey;
+    // Handle case when the deleted snapshot is the first snapshot.
+    if (previousKeyTable == null) {
+      return true;
+    }
+
+    // These are uncommitted blocks wrapped into a pseudo KeyInfo
+    if (deletedKeyInfo.getObjectID() == OBJECT_ID_RECLAIM_BLOCKS) {
+      return true;
+    }
+
+    // Construct keyTable or fileTable DB key depending on the bucket type
+    if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+      dbKey = ozoneManager.getMetadataManager().getOzonePathKey(
+          volumeId,
+          bucketInfo.getObjectID(),
+          deletedKeyInfo.getParentObjectID(),
+          deletedKeyInfo.getKeyName());
+    } else {
+      dbKey = ozoneManager.getMetadataManager().getOzoneKey(
+          deletedKeyInfo.getVolumeName(),
+          deletedKeyInfo.getBucketName(),
+          deletedKeyInfo.getKeyName());
+    }
+
+    /*
+     snapshotRenamedTable:
+     1) /volumeName/bucketName/objectID ->
+                 /volumeId/bucketId/parentId/fileName (FSO)
+     2) /volumeName/bucketName/objectID ->
+                /volumeName/bucketName/keyName (non-FSO)
+    */
+    String dbRenameKey = ozoneManager.getMetadataManager().getRenameKey(
+        deletedKeyInfo.getVolumeName(), deletedKeyInfo.getBucketName(),
+        deletedKeyInfo.getObjectID());
+
+    // Condition: key should not exist in snapshotRenamedTable
+    // of the current snapshot and keyTable of the previous snapshot.
+    // Check key exists in renamedTable of the Snapshot
+    String renamedKey = renamedTable.getIfExist(dbRenameKey);
+
+    if (renamedKey != null && renamedKeyBuilder != null) {
+      renamedKeyBuilder.setKey(dbRenameKey).setValue(renamedKey);
+    }
+    // previousKeyTable is fileTable if the bucket is FSO,
+    // otherwise it is the keyTable.
+    OmKeyInfo prevKeyInfo = renamedKey != null ? previousKeyTable
+        .get(renamedKey) : previousKeyTable.get(dbKey);
+
+    if (prevKeyInfo == null ||
+        prevKeyInfo.getObjectID() != deletedKeyInfo.getObjectID()) {
+      return true;
+    }
+
+    // For key overwrite the objectID will remain the same, In this
+    // case we need to check if OmKeyLocationInfo is also same.
+    return !isBlockLocationInfoSame(prevKeyInfo, deletedKeyInfo);
+  }
+
   public boolean isRatisEnabled() {
     if (ozoneManager == null) {
       return false;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index 1a570ad244..92ff886143 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -17,16 +17,30 @@
 package org.apache.hadoop.ozone.om.service;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
@@ -34,11 +48,20 @@ import 
org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 
 import org.apache.hadoop.ozone.om.PendingKeysDeletion;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +84,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
   private static ClientId clientId = ClientId.randomId();
   private final int keyLimitPerTask;
   private final AtomicLong deletedKeyCount;
+  private final AtomicBoolean suspended;
 
   public KeyDeletingService(OzoneManager ozoneManager,
       ScmBlockLocationProtocol scmClient,
@@ -73,6 +97,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
     this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
         OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
     this.deletedKeyCount = new AtomicLong(0);
+    this.suspended = new AtomicBoolean(false);
   }
 
   /**
@@ -97,7 +122,23 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
       // OzoneManager can be null for testing
       return true;
     }
-    return getOzoneManager().isLeaderReady();
+    return !suspended.get() && getOzoneManager().isLeaderReady();
+  }
+
+  /**
+   * Suspend the service.
+   */
+  @VisibleForTesting
+  public void suspend() {
+    suspended.set(true);
+  }
+
+  /**
+   * Resume the service if suspended.
+   */
+  @VisibleForTesting
+  public void resume() {
+    suspended.set(false);
   }
 
   /**
@@ -127,7 +168,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
         // that is called from OMSnapshotCreateResponse#addToDBBatch.
         manager.getMetadataManager().getTableLock(
             OmMetadataManagerImpl.DELETED_TABLE).writeLock().lock();
-
+        int delCount = 0;
         try {
           // TODO: [SNAPSHOT] HDDS-7968. Reclaim eligible key blocks in
           //  snapshot's deletedTable when active DB's deletedTable
@@ -140,7 +181,7 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
           List<BlockGroup> keyBlocksList = pendingKeysDeletion
               .getKeyBlocksList();
           if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
-            int delCount = processKeyDeletes(keyBlocksList,
+            delCount = processKeyDeletes(keyBlocksList,
                 getOzoneManager().getKeyManager(),
                 pendingKeysDeletion.getKeysToModify(), null);
             deletedKeyCount.addAndGet(delCount);
@@ -153,9 +194,194 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
           manager.getMetadataManager().getTableLock(
               OmMetadataManagerImpl.DELETED_TABLE).writeLock().unlock();
         }
+
+        try {
+          if (delCount < keyLimitPerTask) {
+            processSnapshotDeepClean(delCount);
+          }
+        } catch (Exception e) {
+          LOG.error("Error while running deep clean on snapshots. Will " +
+              "retry at next run.", e);
+        }
+
       }
       // By design, no one cares about the results of this call back.
       return EmptyTaskResult.newResult();
     }
+
+    private void processSnapshotDeepClean(int delCount)
+        throws IOException {
+      OmSnapshotManager omSnapshotManager =
+          getOzoneManager().getOmSnapshotManager();
+      OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
+          getOzoneManager().getMetadataManager();
+      SnapshotChainManager snapChainManager = metadataManager
+          .getSnapshotChainManager();
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          getOzoneManager().getMetadataManager().getSnapshotInfoTable();
+      List<String> deepCleanedSnapshots = new ArrayList<>();
+      try (TableIterator<String, ? extends Table.KeyValue
+          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
+
+        while (delCount < keyLimitPerTask && iterator.hasNext()) {
+          List<BlockGroup> keysToPurge = new ArrayList<>();
+          HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
+          SnapshotInfo currSnapInfo = iterator.next().getValue();
+
+          // Deep clean only on active snapshot. Deleted Snapshots will be
+          // cleaned up by SnapshotDeletingService.
+          if (!currSnapInfo.getSnapshotStatus().equals(SNAPSHOT_ACTIVE) ||
+              !currSnapInfo.getDeepClean()) {
+            continue;
+          }
+
+          OmSnapshot currOmSnapshot = (OmSnapshot) omSnapshotManager
+              .checkForSnapshot(currSnapInfo.getVolumeName(),
+                  currSnapInfo.getBucketName(),
+                  getSnapshotPrefix(currSnapInfo.getName()),
+                  true);
+
+          Table<String, RepeatedOmKeyInfo> snapDeletedTable =
+              currOmSnapshot.getMetadataManager().getDeletedTable();
+          Table<String, String> snapRenamedTable =
+              currOmSnapshot.getMetadataManager().getSnapshotRenamedTable();
+
+          long volumeId = metadataManager.getVolumeId(
+              currSnapInfo.getVolumeName());
+          // Get bucketInfo for the snapshot bucket to get bucket layout.
+          String dbBucketKey = metadataManager.getBucketKey(
+              currSnapInfo.getVolumeName(), currSnapInfo.getBucketName());
+          OmBucketInfo bucketInfo = metadataManager.getBucketTable()
+              .get(dbBucketKey);
+
+          if (bucketInfo == null) {
+            throw new IllegalStateException("Bucket " + "/" + currSnapInfo
+                .getVolumeName() + "/" + currSnapInfo.getBucketName() +
+                " is not found. BucketInfo should not be null for snapshotted" 
+
+                " bucket. The OM is in unexpected state.");
+          }
+
+          String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
+          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(
+              currSnapInfo, snapChainManager, omSnapshotManager);
+          Table<String, OmKeyInfo> previousKeyTable = null;
+          OmSnapshot omPreviousSnapshot = null;
+
+          // Split RepeatedOmKeyInfo and update current snapshot 
deletedKeyTable
+          // and next snapshot deletedKeyTable.
+          if (previousSnapshot != null) {
+            omPreviousSnapshot = (OmSnapshot) omSnapshotManager
+                .checkForSnapshot(previousSnapshot.getVolumeName(),
+                    previousSnapshot.getBucketName(),
+                    getSnapshotPrefix(previousSnapshot.getName()), true);
+
+            previousKeyTable = omPreviousSnapshot
+                
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
+          }
+
+          try (TableIterator<String, ? extends Table.KeyValue<String,
+              RepeatedOmKeyInfo>> deletedIterator = snapDeletedTable
+              .iterator()) {
+
+            deletedIterator.seek(snapshotBucketKey);
+            while (deletedIterator.hasNext() && delCount < keyLimitPerTask) {
+              Table.KeyValue<String, RepeatedOmKeyInfo>
+                  deletedKeyValue = deletedIterator.next();
+              String deletedKey = deletedKeyValue.getKey();
+
+              // Exit if it is out of the bucket scope.
+              if (!deletedKey.startsWith(snapshotBucketKey)) {
+                break;
+              }
+
+              RepeatedOmKeyInfo repeatedOmKeyInfo = deletedKeyValue.getValue();
+
+              List<BlockGroup> blockGroupList = new ArrayList<>();
+              RepeatedOmKeyInfo newRepeatedOmKeyInfo = new RepeatedOmKeyInfo();
+              for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
+                if (isKeyReclaimable(previousKeyTable, snapRenamedTable,
+                    keyInfo, bucketInfo, volumeId, null)) {
+                  List<BlockGroup> blocksForKeyDelete = currOmSnapshot
+                      .getMetadataManager()
+                      .getBlocksForKeyDelete(deletedKey);
+                  if (blocksForKeyDelete != null) {
+                    blockGroupList.addAll(blocksForKeyDelete);
+                  }
+                  delCount++;
+                } else {
+                  newRepeatedOmKeyInfo.addOmKeyInfo(keyInfo);
+                }
+              }
+
+              if (newRepeatedOmKeyInfo.getOmKeyInfoList().size() > 0 &&
+                  newRepeatedOmKeyInfo.getOmKeyInfoList().size() !=
+                  repeatedOmKeyInfo.getOmKeyInfoList().size()) {
+                keysToModify.put(deletedKey, newRepeatedOmKeyInfo);
+              }
+
+              if (newRepeatedOmKeyInfo.getOmKeyInfoList().size() !=
+                  repeatedOmKeyInfo.getOmKeyInfoList().size()) {
+                keysToPurge.addAll(blockGroupList);
+              }
+            }
+
+            if (delCount < keyLimitPerTask) {
+              // Deep clean is completed, we can update the SnapInfo.
+              deepCleanedSnapshots.add(currSnapInfo.getTableKey());
+            }
+
+            if (!keysToPurge.isEmpty()) {
+              processKeyDeletes(keysToPurge, currOmSnapshot.getKeyManager(),
+                  keysToModify, currSnapInfo.getTableKey());
+            }
+
+          }
+        }
+      }
+      updateDeepCleanedSnapshots(deepCleanedSnapshots);
+    }
+
+    private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) 
{
+      if (!deepCleanedSnapshots.isEmpty()) {
+        SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest
+            .newBuilder()
+            .addAllUpdatedSnapshotDBKey(deepCleanedSnapshots)
+            .build();
+
+        OMRequest omRequest = OMRequest.newBuilder()
+            .setCmdType(Type.SnapshotPurge)
+            .setSnapshotPurgeRequest(snapshotPurgeRequest)
+            .setClientId(clientId.toString())
+            .build();
+
+        submitRequest(omRequest);
+      }
+    }
+
+    public void submitRequest(OMRequest omRequest) {
+      try {
+        if (isRatisEnabled()) {
+          OzoneManagerRatisServer server = 
getOzoneManager().getOmRatisServer();
+
+          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+              .setClientId(clientId)
+              .setServerId(server.getRaftPeerId())
+              .setGroupId(server.getRaftGroupId())
+              .setCallId(getRunCount().get())
+              .setMessage(Message.valueOf(
+                  OMRatisHelper.convertRequestToByteString(omRequest)))
+              .setType(RaftClientRequest.writeRequestType())
+              .build();
+
+          server.submitRequest(omRequest, raftClientRequest);
+        } else {
+          getOzoneManager().getOmServerProtocol()
+              .submitRequest(null, omRequest);
+        }
+      } catch (ServiceException e) {
+        LOG.error("Snapshot deep cleaning request failed. " +
+            "Will retry at next run.", e);
+      }
+    }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index f02dc45a31..7c542e5c96 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -71,7 +70,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.OBJECT_ID_RECLAIM_BLOCKS;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
@@ -190,7 +188,8 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
           }
 
           //TODO: [SNAPSHOT] Add lock to deletedTable and Active DB.
-          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(snapInfo);
+          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(
+              snapInfo, chainManager, omSnapshotManager);
           Table<String, OmKeyInfo> previousKeyTable = null;
           Table<String, OmDirectoryInfo> previousDirTable = null;
           OmSnapshot omPreviousSnapshot = null;
@@ -477,91 +476,6 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
       return prevDirectoryInfo.getObjectID() != deletedDirInfo.getObjectID();
     }
 
-    private boolean isKeyReclaimable(
-        Table<String, OmKeyInfo> previousKeyTable,
-        Table<String, String> renamedTable,
-        OmKeyInfo deletedKeyInfo, OmBucketInfo bucketInfo,
-        long volumeId, HddsProtos.KeyValue.Builder renamedKeyBuilder)
-        throws IOException {
-
-      String dbKey;
-      // Handle case when the deleted snapshot is the first snapshot.
-      if (previousKeyTable == null) {
-        return true;
-      }
-
-      // These are uncommitted blocks wrapped into a pseudo KeyInfo
-      if (deletedKeyInfo.getObjectID() == OBJECT_ID_RECLAIM_BLOCKS) {
-        return true;
-      }
-
-      // Construct keyTable or fileTable DB key depending on the bucket type
-      if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
-        dbKey = ozoneManager.getMetadataManager().getOzonePathKey(
-            volumeId,
-            bucketInfo.getObjectID(),
-            deletedKeyInfo.getParentObjectID(),
-            deletedKeyInfo.getKeyName());
-      } else {
-        dbKey = ozoneManager.getMetadataManager().getOzoneKey(
-            deletedKeyInfo.getVolumeName(),
-            deletedKeyInfo.getBucketName(),
-            deletedKeyInfo.getKeyName());
-      }
-
-      /*
-       snapshotRenamedTable:
-       1) /volumeName/bucketName/objectID ->
-                   /volumeId/bucketId/parentId/fileName (FSO)
-       2) /volumeName/bucketName/objectID ->
-                  /volumeName/bucketName/keyName (non-FSO)
-      */
-      String dbRenameKey = ozoneManager.getMetadataManager().getRenameKey(
-          deletedKeyInfo.getVolumeName(), deletedKeyInfo.getBucketName(),
-          deletedKeyInfo.getObjectID());
-
-      // Condition: key should not exist in snapshotRenamedTable
-      // of the current snapshot and keyTable of the previous snapshot.
-      // Check key exists in renamedTable of the Snapshot
-      String renamedKey = renamedTable.getIfExist(dbRenameKey);
-
-      if (renamedKey != null) {
-        renamedKeyBuilder.setKey(dbRenameKey).setValue(renamedKey);
-      }
-      // previousKeyTable is fileTable if the bucket is FSO,
-      // otherwise it is the keyTable.
-      OmKeyInfo prevKeyInfo = renamedKey != null ? previousKeyTable
-          .get(renamedKey) : previousKeyTable.get(dbKey);
-
-      if (prevKeyInfo == null ||
-          prevKeyInfo.getObjectID() != deletedKeyInfo.getObjectID()) {
-        return true;
-      }
-
-      // For key overwrite the objectID will remain the same, In this
-      // case we need to check if OmKeyLocationInfo is also same.
-      return !isBlockLocationInfoSame(prevKeyInfo, deletedKeyInfo);
-    }
-
-    private SnapshotInfo getPreviousActiveSnapshot(SnapshotInfo snapInfo)
-        throws IOException {
-      SnapshotInfo currSnapInfo = snapInfo;
-      while (chainManager.hasPreviousPathSnapshot(
-          currSnapInfo.getSnapshotPath(), currSnapInfo.getSnapshotId())) {
-
-        UUID prevPathSnapshot = chainManager.previousPathSnapshot(
-            currSnapInfo.getSnapshotPath(), currSnapInfo.getSnapshotId());
-        String tableKey = chainManager.getTableKey(prevPathSnapshot);
-        SnapshotInfo prevSnapInfo = 
omSnapshotManager.getSnapshotInfo(tableKey);
-        if (prevSnapInfo.getSnapshotStatus() ==
-            SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
-          return prevSnapInfo;
-        }
-        currSnapInfo = prevSnapInfo;
-      }
-      return null;
-    }
-
     public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
         List<SnapshotMoveKeyInfos> toReclaimList,
         List<SnapshotMoveKeyInfos> toNextDBList,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
index 48ba7cd87e..f78c6e0645 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.ozone.om.snapshot;
 
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus;
@@ -30,6 +32,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.UUID;
+
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 
@@ -121,4 +125,31 @@ public final class SnapshotUtils {
           "' is no longer active", FILE_NOT_FOUND);
     }
   }
+
+  /**
+   * Get the next non deleted snapshot in the snapshot chain.
+   */
+  public static SnapshotInfo getNextActiveSnapshot(SnapshotInfo snapInfo,
+      SnapshotChainManager chainManager, OmSnapshotManager omSnapshotManager)
+      throws IOException {
+    while (chainManager.hasNextPathSnapshot(snapInfo.getSnapshotPath(),
+        snapInfo.getSnapshotId())) {
+
+      UUID nextPathSnapshot =
+          chainManager.nextPathSnapshot(
+              snapInfo.getSnapshotPath(), snapInfo.getSnapshotId());
+
+      String tableKey = chainManager.getTableKey(nextPathSnapshot);
+      SnapshotInfo nextSnapshotInfo =
+          omSnapshotManager.getSnapshotInfo(tableKey);
+
+      if (nextSnapshotInfo.getSnapshotStatus().equals(
+          SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE)) {
+        return nextSnapshotInfo;
+      }
+
+      snapInfo = nextSnapshotInfo;
+    }
+    return null;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
index 82d5ca3f97..8102735a4a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
@@ -76,6 +77,7 @@ public class TestOMSnapshotPurgeRequestAndResponse {
   private OzoneManager ozoneManager;
   private OMMetrics omMetrics;
   private OMMetadataManager omMetadataManager;
+  private OmSnapshotManager omSnapshotManager;
   private AuditLogger auditLogger;
 
   private String volumeName;
@@ -112,6 +114,8 @@ public class TestOMSnapshotPurgeRequestAndResponse {
 
     OmMetadataReader omMetadataReader = Mockito.mock(OmMetadataReader.class);
     when(ozoneManager.getOmMetadataReader()).thenReturn(omMetadataReader);
+    omSnapshotManager = new OmSnapshotManager(ozoneManager);
+    when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
     volumeName = UUID.randomUUID().toString();
     bucketName = UUID.randomUUID().toString();
     keyName = UUID.randomUUID().toString();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 721f153095..b49988571a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -31,15 +31,20 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
 import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.ratis.util.ExitUtils;
 import org.junit.BeforeClass;
@@ -64,6 +69,10 @@ import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
 import org.apache.commons.lang3.RandomStringUtils;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -103,6 +112,8 @@ public class TestKeyDeletingService {
     ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
     conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
+        100, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
     conf.setQuietMode(false);
@@ -446,6 +457,136 @@ public class TestKeyDeletingService {
     Assert.assertTrue(rangeKVs.size() == 0);
   }
 
+  /*
+   * Create Snap1
+   * Create 10 keys
+   * Create Snap2
+   * Delete 10 keys
+   * Create 5 keys
+   * Delete 5 keys -> but stop KeyDeletingService so
+     that keys won't be reclaimed.
+   * Create snap3
+t
+   * Now wait for snap3 to be deepCleaned -> Deleted 5
+     keys should be deep cleaned.
+   * Now delete snap2 -> Wait for snap3 to be deep cleaned so deletedTable
+     of Snap3 should be empty.
+   */
+  @Test
+  public void testSnapshotDeepClean() throws Exception {
+    OzoneConfiguration conf = createConfAndInitValues();
+    OmTestManagers omTestManagers
+        = new OmTestManagers(conf);
+    KeyManager keyManager = omTestManagers.getKeyManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+    OMMetadataManager metadataManager = omTestManagers.getMetadataManager();
+    Table<String, SnapshotInfo> snapshotInfoTable =
+        om.getMetadataManager().getSnapshotInfoTable();
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        om.getMetadataManager().getDeletedTable();
+    Table<String, OmKeyInfo> keyTable =
+        om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT);
+
+    KeyDeletingService keyDeletingService = keyManager.getDeletingService();
+    // Suspend KeyDeletingService
+    keyDeletingService.suspend();
+
+    String volumeName = String.format("volume%s",
+        RandomStringUtils.randomAlphanumeric(5));
+    String bucketName = String.format("bucket%s",
+        RandomStringUtils.randomAlphanumeric(5));
+    String keyName = String.format("key%s",
+        RandomStringUtils.randomAlphanumeric(5));
+
+    // Create Volume and Buckets
+    createVolumeAndBucket(keyManager, volumeName, bucketName, false);
+
+    writeClient.createSnapshot(volumeName, bucketName, "snap1");
+    assertTableRowCount(snapshotInfoTable, 1, metadataManager);
+
+    List<OmKeyArgs> createdKeys = new ArrayList<>();
+    for (int i = 1; i <= 10; i++) {
+      OmKeyArgs args = createAndCommitKey(keyManager, volumeName, bucketName,
+          keyName + i, 3);
+      createdKeys.add(args);
+    }
+    assertTableRowCount(keyTable, 10, metadataManager);
+
+    writeClient.createSnapshot(volumeName, bucketName, "snap2");
+    assertTableRowCount(snapshotInfoTable, 2, metadataManager);
+
+    // Create 5 Keys
+    for (int i = 11; i <= 15; i++) {
+      OmKeyArgs args = createAndCommitKey(keyManager, volumeName, bucketName,
+          keyName + i, 3);
+      createdKeys.add(args);
+    }
+
+    // Delete all 15 keys.
+    for (int i = 0; i < 15; i++) {
+      writeClient.deleteKey(createdKeys.get(i));
+    }
+
+    assertTableRowCount(deletedTable, 15, metadataManager);
+
+    // Create Snap3, traps all the deleted keys.
+    writeClient.createSnapshot(volumeName, bucketName, "snap3");
+    assertTableRowCount(snapshotInfoTable, 3, metadataManager);
+    checkSnapDeepCleanStatus(snapshotInfoTable, true);
+
+    keyDeletingService.resume();
+
+    OmSnapshot snap3 = (OmSnapshot) om.getOmSnapshotManager()
+        .checkForSnapshot(volumeName, bucketName,
+            getSnapshotPrefix("snap3"), true);
+    Table<String, RepeatedOmKeyInfo> snap3deletedTable =
+        snap3.getMetadataManager().getDeletedTable();
+
+    // 5 keys can be deep cleaned as it was stuck previously
+    assertTableRowCount(snap3deletedTable, 10, metadataManager);
+    checkSnapDeepCleanStatus(snapshotInfoTable, false);
+
+    writeClient.deleteSnapshot(volumeName, bucketName, "snap2");
+    assertTableRowCount(snapshotInfoTable, 2, metadataManager);
+
+    assertTableRowCount(snap3deletedTable, 0, metadataManager);
+    assertTableRowCount(deletedTable, 0, metadataManager);
+  }
+
+  private void checkSnapDeepCleanStatus(Table<String, SnapshotInfo>
+      snapshotInfoTable, boolean deepClean) throws IOException {
+
+    try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
+             iterator = snapshotInfoTable.iterator()) {
+      while (iterator.hasNext()) {
+        SnapshotInfo snapInfo = iterator.next().getValue();
+        assertEquals(snapInfo.getDeepClean(), deepClean);
+      }
+    }
+  }
+
+  private void assertTableRowCount(Table<String, ?> table,
+        int count, OMMetadataManager metadataManager)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> assertTableRowCount(count, table,
+            metadataManager), 1000, 120000); // 2 minutes
+  }
+
+  private boolean assertTableRowCount(int expectedCount,
+                                      Table<String, ?> table,
+                                      OMMetadataManager metadataManager) {
+    long count = 0L;
+    try {
+      count = metadataManager.countRowsInTable(table);
+      LOG.info("{} actual row count={}, expectedCount={}", table.getName(),
+          count, expectedCount);
+    } catch (IOException ex) {
+      fail("testDoubleBuffer failed with: " + ex);
+    }
+    return count == expectedCount;
+  }
+
   private void createVolumeAndBucket(KeyManager keyManager, String volumeName,
       String bucketName, boolean isVersioningEnabled) throws IOException {
     // cheat here, just create a volume and bucket entry so that we can


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to