This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 184bfc2118 HDDS-7883. [Snapshot] Accommodate FSO, key renames and 
implement OMSnapshotPurgeRequest for SnapshotDeletingService (#4407)
184bfc2118 is described below

commit 184bfc21189583f857280933270f134dcac68879
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Tue Mar 21 12:02:11 2023 -0700

    HDDS-7883. [Snapshot] Accommodate FSO, key renames and implement 
OMSnapshotPurgeRequest for SnapshotDeletingService (#4407)
---
 .../common/src/main/resources/ozone-default.xml    |   9 +
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   4 +
 .../src/main/proto/OmClientProtocol.proto          |  11 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  12 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  11 -
 .../hadoop/ozone/om/SnapshotChainManager.java      |  29 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   3 +
 .../request/snapshot/OMSnapshotCreateRequest.java  |   7 +-
 .../snapshot/OMSnapshotMoveDeletedKeysRequest.java |   5 +-
 .../request/snapshot/OMSnapshotPurgeRequest.java   |  64 +++++
 .../response/snapshot/OMSnapshotPurgeResponse.java | 145 ++++++++++
 .../ozone/om/service/SnapshotDeletingService.java  | 118 ++++++--
 .../snapshot/TestOMSnapshotCreateRequest.java      |  16 +-
 .../snapshot/TestOMSnapshotDeleteRequest.java      |   8 +-
 .../TestOMSnapshotPurgeRequestAndResponse.java     | 311 +++++++++++++++++++++
 16 files changed, 685 insertions(+), 69 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index eaebbc76c0..ec98bc3e16 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -491,6 +491,15 @@
       and DataNode.
     </description>
   </property>
+  <property>
+    <name>ozone.snapshot.key.deleting.limit.per.task</name>
+    <value>20000</value>
+    <tag>OM, PERFORMANCE</tag>
+    <description>
+      The maximum number of deleted keys to be scanned by Snapshot
+      Deleting Service per snapshot run.
+    </description>
+  </property>
   <property>
     <name>ozone.om.service.ids</name>
     <value/>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 932f017814..d1638e4377 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -310,6 +310,7 @@ public final class OmUtils {
     case CreateSnapshot:
     case DeleteSnapshot:
     case SnapshotMoveDeletedKeys:
+    case SnapshotPurge:
       return false;
     default:
       LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 92a2150a52..937835fdb7 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -85,6 +85,10 @@ public final class OMConfigKeys {
   public static final String OZONE_KEY_DELETING_LIMIT_PER_TASK =
       "ozone.key.deleting.limit.per.task";
   public static final int OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT = 20000;
+  public static final String OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK =
+      "ozone.snapshot.key.deleting.limit.per.task";
+  public static final int OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT
+      = 20000;
 
   public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL =
       "ozone.om.open.key.cleanup.service.interval";
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 73ed666c87..295065abfd 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -133,6 +133,7 @@ enum Type {
   SnapshotMoveDeletedKeys = 116;
 
   TransferLeadership = 117;
+  SnapshotPurge = 118;
 }
 
 message OMRequest {
@@ -248,6 +249,7 @@ message OMRequest {
   optional SnapshotMoveDeletedKeysRequest   SnapshotMoveDeletedKeysRequest = 
116;
 
   optional hdds.TransferLeadershipRequestProto      
TransferOmLeadershipRequest    = 117;
+  optional SnapshotPurgeRequest             SnapshotPurgeRequest           = 
118;
 
 }
 
@@ -357,6 +359,7 @@ message OMResponse {
   optional SnapshotMoveDeletedKeysResponse   SnapshotMoveDeletedKeysResponse = 
116;
 
   optional hdds.TransferLeadershipResponseProto   TransferOmLeadershipResponse 
 = 117;
+  optional SnapshotPurgeResponse              SnapshotPurgeResponse         = 
118;
 }
 
 enum Status {
@@ -1718,6 +1721,10 @@ message SnapshotMoveKeyInfos {
   repeated KeyInfo keyInfos = 2;
 }
 
+message SnapshotPurgeRequest {
+  repeated string snapshotDBKeys = 1;
+}
+
 message DeleteTenantRequest {
     optional string tenantId = 1;
 }
@@ -1784,6 +1791,10 @@ message SnapshotMoveDeletedKeysResponse {
 
 }
 
+message SnapshotPurgeResponse {
+
+}
+
 message SnapshotDiffReportProto {
   optional string volumeName = 1;
   optional string bucketName = 2;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 5fad57b757..719641c42d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -303,6 +303,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
 
   private Map<String, Table> tableMap = new HashMap<>();
   private List<TableCacheMetrics> tableCacheMetrics = new LinkedList<>();
+  private SnapshotChainManager snapshotChainManager;
 
   public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
     this.lock = new OzoneManagerLock(conf);
@@ -476,6 +477,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
 
       initializeOmTables(true);
     }
+
+    snapshotChainManager = new SnapshotChainManager(this);
   }
 
   public static DBStore loadDB(OzoneConfiguration configuration, File metaDir)
@@ -1666,6 +1669,15 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     return renamedKeyTable;
   }
 
+  /**
+   * Get Snapshot Chain Manager.
+   *
+   * @return SnapshotChainManager.
+   */
+  public SnapshotChainManager getSnapshotChainManager() {
+    return snapshotChainManager;
+  }
+
   /**
    * Update store used by subclass.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b2b163b0a8..228a56ec65 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -452,7 +452,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   // This metadata reader points to the active filesystem
   private OmMetadataReader omMetadataReader;
   private OmSnapshotManager omSnapshotManager;
-  private SnapshotChainManager snapshotChainManager;
 
   /** A list of property that are reconfigurable at runtime. */
   private final SortedSet<String> reconfigurableProperties =
@@ -770,7 +769,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     omMetadataReader = new OmMetadataReader(keyManager, prefixManager,
         this, LOG, AUDIT, metrics);
     omSnapshotManager = new OmSnapshotManager(this);
-    snapshotChainManager = new SnapshotChainManager(metadataManager);
 
     // Snapshot metrics
     updateActiveSnapshotMetrics();
@@ -1510,15 +1508,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return omSnapshotManager;
   }
 
-  /**
-   * Get Snapshot Chain Manager.
-   *
-   * @return SnapshotChainManager.
-   */
-  public SnapshotChainManager getSnapshotChainManager() {
-    return snapshotChainManager;
-  }
-
   /**
    * Get metadata manager.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
index 0df80d2992..ff637358d0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java
@@ -49,7 +49,7 @@ public class SnapshotChainManager {
       snapshotChainPath;
   private Map<String, String> latestPathSnapshotID;
   private String latestGlobalSnapshotID;
-  private Map<String, String> snapshotPathToTableKey;
+  private Map<String, String> snapshotIdToTableKey;
   private static final Logger LOG =
       LoggerFactory.getLogger(SnapshotChainManager.class);
 
@@ -58,7 +58,7 @@ public class SnapshotChainManager {
     snapshotChainGlobal = new LinkedHashMap<>();
     snapshotChainPath = new HashMap<>();
     latestPathSnapshotID = new HashMap<>();
-    snapshotPathToTableKey = new HashMap<>();
+    snapshotIdToTableKey = new HashMap<>();
     latestGlobalSnapshotID = null;
     loadFromSnapshotInfoTable(metadataManager);
   }
@@ -100,8 +100,7 @@ public class SnapshotChainManager {
    */
   private void addSnapshotPath(String snapshotPath,
                                String snapshotID,
-                               String prevPathID,
-                               String snapTableKey) throws IOException {
+                               String prevPathID) throws IOException {
     // set previous snapshotID to null if it is "" for
     // internal in-mem structure
     if (prevPathID != null && prevPathID.isEmpty()) {
@@ -139,8 +138,6 @@ public class SnapshotChainManager {
         .put(snapshotID,
             new SnapshotChainInfo(snapshotID, prevPathID, null));
 
-    // store snapshot ID to snapshot DB table key in the map
-    snapshotPathToTableKey.put(snapshotID, snapTableKey);
     // set state variable latestPath snapshot entry to this snapshotID
     latestPathSnapshotID.put(snapshotPath, snapshotID);
   };
@@ -272,7 +269,7 @@ public class SnapshotChainManager {
     snapshotChainGlobal.clear();
     snapshotChainPath.clear();
     latestPathSnapshotID.clear();
-    snapshotPathToTableKey.clear();
+    snapshotIdToTableKey.clear();
 
     while (keyIter.hasNext()) {
       kv = keyIter.next();
@@ -292,8 +289,9 @@ public class SnapshotChainManager {
         sinfo.getGlobalPreviousSnapshotID());
     addSnapshotPath(sinfo.getSnapshotPath(),
         sinfo.getSnapshotID(),
-        sinfo.getPathPreviousSnapshotID(),
-        sinfo.getTableKey());
+        sinfo.getPathPreviousSnapshotID());
+    // store snapshot ID to snapshot DB table key in the map
+    snapshotIdToTableKey.put(sinfo.getSnapshotID(), sinfo.getTableKey());
   }
 
   /**
@@ -304,9 +302,12 @@ public class SnapshotChainManager {
   public boolean deleteSnapshot(SnapshotInfo sinfo) throws IOException {
     boolean status;
 
-    status = deleteSnapshotGlobal(sinfo.getSnapshotID());
-    return status && deleteSnapshotPath(sinfo.getSnapshotPath(),
-        sinfo.getSnapshotID());
+    status = deleteSnapshotGlobal(sinfo.getSnapshotID()) &&
+        deleteSnapshotPath(sinfo.getSnapshotPath(), sinfo.getSnapshotID());
+    if (status) {
+      snapshotIdToTableKey.remove(sinfo.getSnapshotID());
+    }
+    return status;
   }
 
   /**
@@ -521,8 +522,8 @@ public class SnapshotChainManager {
         .getPreviousSnapshotID();
   }
 
-  public String getTableKey(String snapshotPath) {
-    return snapshotPathToTableKey.get(snapshotPath);
+  public String getTableKey(String snapshotId) {
+    return snapshotIdToTableKey.get(snapshotId);
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index bc08a90598..512c8cb92b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -73,6 +73,7 @@ import 
org.apache.hadoop.ozone.om.request.security.OMRenewDelegationTokenRequest
 import org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotCreateRequest;
 import org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotDeleteRequest;
 import 
org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotMoveDeletedKeysRequest;
+import org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotPurgeRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMCancelPrepareRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMFinalizeUpgradeRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMPrepareRequest;
@@ -218,6 +219,8 @@ public final class OzoneManagerRatisUtils {
       return new OMSnapshotDeleteRequest(omRequest);
     case SnapshotMoveDeletedKeys:
       return new OMSnapshotMoveDeletedKeysRequest(omRequest);
+    case SnapshotPurge:
+      return new OMSnapshotPurgeRequest(omRequest);
     case DeleteOpenKeys:
       BucketLayout bktLayout = BucketLayout.DEFAULT;
       if (omRequest.getDeleteOpenKeysRequest().hasBucketLayout()) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
index a337a791d8..4c0b81e740 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -115,9 +115,10 @@ public class OMSnapshotCreateRequest extends 
OMClientRequest {
 
     boolean acquiredBucketLock = false, acquiredSnapshotLock = false;
     IOException exception = null;
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
+        ozoneManager.getMetadataManager();
     SnapshotChainManager snapshotChainManager =
-        ozoneManager.getSnapshotChainManager();
+        omMetadataManager.getSnapshotChainManager();
 
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
index f1db67846f..2c1f44e677 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.ozone.om.request.snapshot;
 
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -57,8 +58,10 @@ public class OMSnapshotMoveDeletedKeysRequest extends 
OMClientRequest {
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
     OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
+        ozoneManager.getMetadataManager();
     SnapshotChainManager snapshotChainManager =
-        ozoneManager.getSnapshotChainManager();
+        omMetadataManager.getSnapshotChainManager();
 
     SnapshotMoveDeletedKeysRequest moveDeletedKeysRequest =
         getOmRequest().getSnapshotMoveDeletedKeysRequest();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
new file mode 100644
index 0000000000..30409c0473
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
+
+import java.util.List;
+
+/**
+ * Handles OMSnapshotPurge Request.
+ */
+public class OMSnapshotPurgeRequest extends OMClientRequest {
+
+  public OMSnapshotPurgeRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+    OMClientResponse omClientResponse = null;
+
+    OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    SnapshotPurgeRequest snapshotPurgeRequest = getOmRequest()
+        .getSnapshotPurgeRequest();
+
+    List<String> snapshotDbKeys = snapshotPurgeRequest
+        .getSnapshotDBKeysList();
+
+    omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
+        snapshotDbKeys);
+    addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+        omDoubleBufferHelper);
+
+    return omClientResponse;
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
new file mode 100644
index 0000000000..9d625ea1dc
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+
+/**
+ * Response for OMSnapshotPurgeRequest.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotPurgeResponse extends OMClientResponse {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotPurgeResponse.class);
+  private List<String> snapshotDbKeys;
+
+  public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse,
+      @Nonnull List<String> snapshotDbKeys) {
+    super(omResponse);
+    this.snapshotDbKeys = snapshotDbKeys;
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
+        omMetadataManager;
+    for (String dbKey: snapshotDbKeys) {
+      SnapshotInfo snapshotInfo = omMetadataManager
+          .getSnapshotInfoTable().get(dbKey);
+      cleanupSnapshotChain(metadataManager, snapshotInfo, batchOperation);
+      // Delete Snapshot checkpoint directory.
+      deleteCheckpointDirectory(omMetadataManager, snapshotInfo);
+      omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation,
+          dbKey);
+    }
+  }
+
+  /**
+   * Cleans up the snapshot chain and updates next snapshot's
+   * previousPath and previousGlobal IDs.
+   * @param metadataManager
+   * @param snapInfo
+   * @param batchOperation
+   */
+  private void cleanupSnapshotChain(OmMetadataManagerImpl metadataManager,
+      SnapshotInfo snapInfo, BatchOperation batchOperation) throws IOException 
{
+    SnapshotChainManager snapshotChainManager = metadataManager
+        .getSnapshotChainManager();
+
+    // Updates next path snapshot's previous snapshot ID
+    if (snapshotChainManager.hasNextPathSnapshot(
+        snapInfo.getSnapshotPath(), snapInfo.getSnapshotID())) {
+      String nextPathSnapshotId =
+          snapshotChainManager.nextPathSnapshot(
+              snapInfo.getSnapshotPath(), snapInfo.getSnapshotID());
+
+      String snapshotTableKey = snapshotChainManager
+          .getTableKey(nextPathSnapshotId);
+      SnapshotInfo nextPathSnapInfo =
+          metadataManager.getSnapshotInfoTable().get(snapshotTableKey);
+      if (nextPathSnapInfo != null) {
+        nextPathSnapInfo.setPathPreviousSnapshotID(
+            snapInfo.getPathPreviousSnapshotID());
+        metadataManager.getSnapshotInfoTable().putWithBatch(batchOperation,
+            nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
+      }
+    }
+
+    // Updates next global snapshot's previous snapshot ID
+    if (snapshotChainManager.hasNextGlobalSnapshot(
+        snapInfo.getSnapshotID())) {
+      String nextGlobalSnapshotId =
+          snapshotChainManager.nextGlobalSnapshot(snapInfo.getSnapshotID());
+
+      String snapshotTableKey = snapshotChainManager
+          .getTableKey(nextGlobalSnapshotId);
+      SnapshotInfo nextGlobalSnapInfo =
+          metadataManager.getSnapshotInfoTable().get(snapshotTableKey);
+      if (nextGlobalSnapInfo != null) {
+        nextGlobalSnapInfo.setGlobalPreviousSnapshotID(
+            snapInfo.getPathPreviousSnapshotID());
+        metadataManager.getSnapshotInfoTable().putWithBatch(batchOperation,
+            nextGlobalSnapInfo.getTableKey(), nextGlobalSnapInfo);
+      }
+    }
+
+    // Removes current snapshot from the snapshot chain.
+    snapshotChainManager.deleteSnapshot(snapInfo);
+  }
+
+  /**
+   * Deletes the checkpoint directory for a snapshot.
+   * @param omMetadataManager
+   * @param snapshotInfo
+   */
+  private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
+      SnapshotInfo snapshotInfo) {
+    RDBStore store = (RDBStore) omMetadataManager.getStore();
+    String checkpointPrefix = store.getDbLocation().getName();
+    Path snapshotDirPath = Paths.get(store.getSnapshotsParentDir(),
+        checkpointPrefix + snapshotInfo.getCheckpointDir());
+    try {
+      FileUtils.deleteDirectory(snapshotDirPath.toFile());
+    } catch (IOException ex) {
+      LOG.error("Failed to delete snapshot directory {} for snapshot {}",
+          snapshotDirPath, snapshotInfo.getTableKey(), ex);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index d98acd4486..8d01ff8952 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -37,12 +38,14 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyRenameInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
@@ -57,6 +60,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OBJECT_ID_RECLAIM_BLOCKS;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -84,6 +89,7 @@ public class SnapshotDeletingService extends 
BackgroundService {
   private final OzoneConfiguration conf;
   private final AtomicLong successRunCount;
   private final long snapshotDeletionPerTask;
+  private final int keyLimitPerSnapshot;
 
   public SnapshotDeletingService(long interval, long serviceTimeout,
       OzoneManager ozoneManager) throws IOException {
@@ -92,7 +98,9 @@ public class SnapshotDeletingService extends 
BackgroundService {
         serviceTimeout);
     this.ozoneManager = ozoneManager;
     this.omSnapshotManager = ozoneManager.getOmSnapshotManager();
-    this.chainManager = ozoneManager.getSnapshotChainManager();
+    OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
+        ozoneManager.getMetadataManager();
+    this.chainManager = omMetadataManager.getSnapshotChainManager();
     this.runCount = new AtomicLong(0);
     this.successRunCount = new AtomicLong(0);
     this.suspended = new AtomicBoolean(false);
@@ -100,6 +108,9 @@ public class SnapshotDeletingService extends 
BackgroundService {
     this.snapshotDeletionPerTask = conf
         .getLong(SNAPSHOT_DELETING_LIMIT_PER_TASK,
         SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT);
+    this.keyLimitPerSnapshot = conf.getInt(
+        OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK,
+        OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
   }
 
   private class SnapshotDeletingTask implements BackgroundTask {
@@ -118,6 +129,7 @@ public class SnapshotDeletingService extends 
BackgroundService {
           <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
 
         long snapshotLimit = snapshotDeletionPerTask;
+        List<String> purgeSnapshotKeys = new ArrayList<>();
 
         while (iterator.hasNext() && snapshotLimit > 0) {
           SnapshotInfo snapInfo = iterator.next().getValue();
@@ -142,6 +154,11 @@ public class SnapshotDeletingService extends 
BackgroundService {
             continue;
           }
 
+          Table<String, OmKeyRenameInfo> renamedKeyTable =
+              omSnapshot.getMetadataManager().getRenamedKeyTable();
+
+          long volumeId = ozoneManager.getMetadataManager()
+              .getVolumeId(snapInfo.getVolumeName());
           // Get bucketInfo for the snapshot bucket to get bucket layout.
           String dbBucketKey = ozoneManager.getMetadataManager().getBucketKey(
               snapInfo.getVolumeName(), snapInfo.getBucketName());
@@ -182,13 +199,19 @@ public class SnapshotDeletingService extends 
BackgroundService {
             String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
             iterator.seek(snapshotBucketKey);
 
-            while (deletedIterator.hasNext()) {
+            int deletionCount = 0;
+            while (deletedIterator.hasNext() &&
+                deletionCount <= keyLimitPerSnapshot) {
               Table.KeyValue<String, RepeatedOmKeyInfo>
                   deletedKeyValue = deletedIterator.next();
               String deletedKey = deletedKeyValue.getKey();
 
               // Exit if it is out of the bucket scope.
               if (!deletedKey.startsWith(snapshotBucketKey)) {
+                // If snapshot deletedKeyTable doesn't have any
+                // entry in the snapshot scope it can be reclaimed
+                // TODO: [SNAPSHOT] Check deletedDirTable to be empty.
+                purgeSnapshotKeys.add(snapInfo.getTableKey());
                 break;
               }
 
@@ -203,7 +226,8 @@ public class SnapshotDeletingService extends 
BackgroundService {
 
               for (OmKeyInfo keyInfo: repeatedOmKeyInfo.getOmKeyInfoList()) {
                 splitRepeatedOmKeyInfo(toReclaim, toNextDb,
-                    keyInfo, previousKeyTable);
+                    keyInfo, previousKeyTable, renamedKeyTable,
+                    bucketInfo, volumeId);
               }
 
               // If all the KeyInfos are reclaimable in RepeatedOmKeyInfo
@@ -213,7 +237,7 @@ public class SnapshotDeletingService extends 
BackgroundService {
                 toReclaimList.add(toReclaim.build());
               }
               toNextDBList.add(toNextDb.build());
-
+              deletionCount++;
             }
             // Submit Move request to OM.
             submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList,
@@ -224,6 +248,8 @@ public class SnapshotDeletingService extends 
BackgroundService {
             LOG.error("Error while running Snapshot Deleting Service", ex);
           }
         }
+
+        submitSnapshotPurgeRequest(purgeSnapshotKeys);
       } catch (IOException e) {
         LOG.error("Error while running Snapshot Deleting Service", e);
       }
@@ -231,10 +257,32 @@ public class SnapshotDeletingService extends 
BackgroundService {
       return BackgroundTaskResult.EmptyTaskResult.newResult();
     }
 
+    private void submitSnapshotPurgeRequest(List<String> purgeSnapshotKeys) {
+      if (!purgeSnapshotKeys.isEmpty()) {
+        SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest
+            .newBuilder()
+            .addAllSnapshotDBKeys(purgeSnapshotKeys)
+            .build();
+
+        OMRequest omRequest = OMRequest.newBuilder()
+            .setCmdType(Type.SnapshotPurge)
+            .setSnapshotPurgeRequest(snapshotPurgeRequest)
+            .setClientId(clientId.toString())
+            .build();
+
+        // TODO: [SNAPSHOT] Submit request once KeyDeletingService,
+        //  DirectoryDeletingService for snapshots are modified.
+        // submitRequest(omRequest);
+      }
+    }
+
     private void splitRepeatedOmKeyInfo(SnapshotMoveKeyInfos.Builder toReclaim,
         SnapshotMoveKeyInfos.Builder toNextDb, OmKeyInfo keyInfo,
-        Table<String, OmKeyInfo> previousKeyTable) throws IOException {
-      if (checkKeyReclaimable(previousKeyTable, keyInfo)) {
+        Table<String, OmKeyInfo> previousKeyTable,
+        Table<String, OmKeyRenameInfo> renamedKeyTable,
+        OmBucketInfo bucketInfo, long volumeId) throws IOException {
+      if (checkKeyReclaimable(previousKeyTable, renamedKeyTable,
+          keyInfo, bucketInfo, volumeId)) {
         // Move to next non deleted snapshot's deleted table
         toNextDb.addKeyInfos(keyInfo.getProtobuf(
             ClientVersion.CURRENT_VERSION));
@@ -268,9 +316,12 @@ public class SnapshotDeletingService extends 
BackgroundService {
     }
 
     private boolean checkKeyReclaimable(
-        Table<String, OmKeyInfo> previousKeyTable, OmKeyInfo deletedKeyInfo)
-        throws IOException {
+        Table<String, OmKeyInfo> previousKeyTable,
+        Table<String, OmKeyRenameInfo> renamedKeyTable,
+        OmKeyInfo deletedKeyInfo, OmBucketInfo bucketInfo,
+        long volumeId) throws IOException {
 
+      String dbKey;
       // Handle case when the deleted snapshot is the first snapshot.
       if (previousKeyTable == null) {
         return false;
@@ -281,17 +332,48 @@ public class SnapshotDeletingService extends 
BackgroundService {
         return false;
       }
 
-      //TODO: [SNAPSHOT] Handle Renamed Keys
-      String dbKey = ozoneManager.getMetadataManager()
-          .getOzoneKey(deletedKeyInfo.getVolumeName(),
-              deletedKeyInfo.getBucketName(), deletedKeyInfo.getKeyName());
+      // Construct keyTable or fileTable DB key depending on the bucket type
+      if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+        dbKey = ozoneManager.getMetadataManager().getOzonePathKey(
+            volumeId,
+            bucketInfo.getObjectID(),
+            deletedKeyInfo.getParentObjectID(),
+            deletedKeyInfo.getKeyName());
+      } else {
+        dbKey = ozoneManager.getMetadataManager().getOzoneKey(
+            deletedKeyInfo.getVolumeName(),
+            deletedKeyInfo.getBucketName(),
+            deletedKeyInfo.getKeyName());
+      }
+
+      // renamedKeyTable: volumeName/bucketName/objectID -> OMRenameKeyInfo
+      String dbRenameKey = ozoneManager.getMetadataManager().getRenameKey(
+          deletedKeyInfo.getVolumeName(), deletedKeyInfo.getBucketName(),
+          deletedKeyInfo.getObjectID());
+
+      OmKeyRenameInfo renamedKeyInfo = renamedKeyTable.getIfExist(dbRenameKey);
+
+      boolean isKeyRenamed = false;
+      String dbOriginalKey = null;
+      // Condition: key should not exist in renamedKeyTable of the current
+      // snapshot and keyTable of the previous snapshot.
+      // Check key exists in renamedKeyTable of the Snapshot
+      if (renamedKeyInfo != null && !renamedKeyInfo
+          .getOmKeyRenameInfoList().isEmpty()) {
+        isKeyRenamed = true;
+        dbOriginalKey = renamedKeyInfo.getOmKeyRenameInfoList().get(0);
+      }
+
+      // previousKeyTable is fileTable if the bucket is FSO,
+      // otherwise it is the keyTable.
+      OmKeyInfo prevKeyInfo = isKeyRenamed ? previousKeyTable
+          .get(dbOriginalKey) : previousKeyTable.get(dbKey);
 
-      OmKeyInfo prevKeyInfo = previousKeyTable.get(dbKey);
-      if (prevKeyInfo != null &&
-          prevKeyInfo.getObjectID() == deletedKeyInfo.getObjectID()) {
-        return true;
+      if (prevKeyInfo == null) {
+        return false;
       }
-      return false;
+
+      return prevKeyInfo.getObjectID() == deletedKeyInfo.getObjectID();
     }
 
     private SnapshotInfo getPreviousSnapshot(SnapshotInfo snapInfo)
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
index 8065e78bff..eb66a6d325 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
@@ -29,11 +29,9 @@ import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 
 import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
@@ -71,7 +69,7 @@ public class TestOMSnapshotCreateRequest {
 
   private OzoneManager ozoneManager;
   private OMMetrics omMetrics;
-  private OMMetadataManager omMetadataManager;
+  private OmMetadataManagerImpl omMetadataManager;
   private BatchOperation batchOperation;
 
   private String volumeName;
@@ -211,11 +209,7 @@ public class TestOMSnapshotCreateRequest {
 
   @Test
   public void testValidateAndUpdateCache() throws Exception {
-    SnapshotChainManager snapshotChainManager =
-        new SnapshotChainManager(omMetadataManager);
     when(ozoneManager.isAdmin(any())).thenReturn(true);
-    when(ozoneManager.getSnapshotChainManager())
-        .thenReturn(snapshotChainManager);
     OMRequest omRequest =
         OMRequestTestUtils.createSnapshotRequest(
         volumeName, bucketName, snapshotName);
@@ -254,11 +248,7 @@ public class TestOMSnapshotCreateRequest {
 
   @Test
   public void testEmptyRenamedKeyTable() throws Exception {
-    SnapshotChainManager snapshotChainManager =
-        new SnapshotChainManager(omMetadataManager);
     when(ozoneManager.isAdmin(any())).thenReturn(true);
-    when(ozoneManager.getSnapshotChainManager())
-        .thenReturn(snapshotChainManager);
     OmKeyInfo toKeyInfo = addKey("key1");
     OmKeyInfo fromKeyInfo = addKey("key2");
 
@@ -302,11 +292,7 @@ public class TestOMSnapshotCreateRequest {
 
   @Test
   public void testEntryExists() throws Exception {
-    SnapshotChainManager snapshotChainManager =
-        new SnapshotChainManager(omMetadataManager);
     when(ozoneManager.isAdmin(any())).thenReturn(true);
-    when(ozoneManager.getSnapshotChainManager())
-        .thenReturn(snapshotChainManager);
     OMRequest omRequest =
         OMRequestTestUtils.createSnapshotRequest(
         volumeName, bucketName, snapshotName);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
index b8d7c74173..e380a5281c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
@@ -27,11 +27,9 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
@@ -68,7 +66,7 @@ public class TestOMSnapshotDeleteRequest {
 
   private OzoneManager ozoneManager;
   private OMMetrics omMetrics;
-  private OMMetadataManager omMetadataManager;
+  private OmMetadataManagerImpl omMetadataManager;
 
   private String volumeName;
   private String bucketName;
@@ -259,10 +257,6 @@ public class TestOMSnapshotDeleteRequest {
    */
   @Test
   public void testEntryExists() throws Exception {
-    SnapshotChainManager snapshotChainManager =
-        new SnapshotChainManager(omMetadataManager);
-    when(ozoneManager.getSnapshotChainManager())
-        .thenReturn(snapshotChainManager);
     when(ozoneManager.isAdmin(any())).thenReturn(true);
     String key = SnapshotInfo.getTableKey(volumeName, bucketName, 
snapshotName);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
new file mode 100644
index 0000000000..02814c788e
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataReader;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse;
+import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests OMSnapshotPurgeRequest class.
+ */
+public class TestOMSnapshotPurgeRequestAndResponse {
+
+  private BatchOperation batchOperation;
+  private List<Path> checkpointPaths = new ArrayList<>();
+
+  private OzoneManager ozoneManager;
+  private OMMetrics omMetrics;
+  private OMMetadataManager omMetadataManager;
+  private AuditLogger auditLogger;
+
+  private String volumeName;
+  private String bucketName;
+  private String keyName;
+
+
+  // Just setting ozoneManagerDoubleBuffer which does nothing.
+  private static OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper 
=
+      ((response, transactionIndex) -> {
+        return null;
+      });
+
+  @BeforeEach
+  public void setup() throws Exception {
+    File testDir = GenericTestUtils.getRandomizedTestDir();
+    ozoneManager = Mockito.mock(OzoneManager.class);
+    OMLayoutVersionManager lvm = mock(OMLayoutVersionManager.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
+    when(ozoneManager.isRatisEnabled()).thenReturn(true);
+    auditLogger = Mockito.mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    omMetrics = OMMetrics.create();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        testDir.getAbsolutePath());
+    ozoneConfiguration.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
+        testDir.getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
+    when(ozoneManager.isAdmin(any(UserGroupInformation.class)))
+        .thenReturn(true);
+
+    OmMetadataReader omMetadataReader = Mockito.mock(OmMetadataReader.class);
+    when(ozoneManager.getOmMetadataReader()).thenReturn(omMetadataReader);
+    volumeName = UUID.randomUUID().toString();
+    bucketName = UUID.randomUUID().toString();
+    keyName = UUID.randomUUID().toString();
+  }
+
+  /**
+   * Creates volume, bucket and snapshot entries.
+   */
+  private List<String> createSnapshots(int numSnapshotKeys)
+      throws Exception {
+
+    Random random = new Random();
+    // Add volume, bucket and key entries to OM DB.
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    // Create Snapshot and CheckpointDir
+    List<String> purgeSnapshots = new ArrayList<>(numSnapshotKeys);
+    for (int i = 1; i <= numSnapshotKeys; i++) {
+      String snapshotName = keyName + "-" + random.nextLong();
+      createSnapshotCheckpoint(snapshotName);
+      purgeSnapshots.add(SnapshotInfo.getTableKey(volumeName,
+          bucketName, snapshotName));
+    }
+
+    return purgeSnapshots;
+  }
+
+  /**
+   * Create OMRequest which encapsulates SnapshotPurgeRequest.
+   *
+   * @return OMRequest
+   */
+  private OMRequest createPurgeKeysRequest(List<String> purgeSnapshotKeys) {
+    SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest
+        .newBuilder()
+        .addAllSnapshotDBKeys(purgeSnapshotKeys)
+        .build();
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.SnapshotPurge)
+        .setSnapshotPurgeRequest(snapshotPurgeRequest)
+        .setClientId(UUID.randomUUID().toString())
+        .build();
+
+    return omRequest;
+  }
+
+  /**
+   * Create snapshot and checkpoint directory.
+   */
+  private void createSnapshotCheckpoint(String snapshotName) throws Exception {
+    when(ozoneManager.isAdmin(any())).thenReturn(true);
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+    OMRequest omRequest = OMRequestTestUtils
+        .createSnapshotRequest(volumeName, bucketName, snapshotName);
+    // Pre-Execute OMSnapshotCreateRequest.
+    OMSnapshotCreateRequest omSnapshotCreateRequest =
+        TestOMSnapshotCreateRequest.doPreExecute(omRequest, ozoneManager);
+
+    // validateAndUpdateCache OMSnapshotCreateResponse.
+    OMSnapshotCreateResponse omClientResponse = (OMSnapshotCreateResponse)
+        omSnapshotCreateRequest.validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    // Add to batch and commit to DB.
+    omClientResponse.addToDBBatch(omMetadataManager, batchOperation);
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+    batchOperation.close();
+
+    String key = SnapshotInfo.getTableKey(volumeName,
+        bucketName, snapshotName);
+    SnapshotInfo snapshotInfo =
+        omMetadataManager.getSnapshotInfoTable().get(key);
+    Assertions.assertNotNull(snapshotInfo);
+
+    RDBStore store = (RDBStore) omMetadataManager.getStore();
+    String checkpointPrefix = store.getDbLocation().getName();
+    Path snapshotDirPath = Paths.get(store.getSnapshotsParentDir(),
+        checkpointPrefix + snapshotInfo.getCheckpointDir());
+    //Check the DB is still there
+    Assertions.assertTrue(Files.exists(snapshotDirPath));
+    checkpointPaths.add(snapshotDirPath);
+  }
+
+  private OMSnapshotPurgeRequest preExecute(OMRequest originalOmRequest)
+      throws IOException {
+    OMSnapshotPurgeRequest omSnapshotPurgeRequest =
+        new OMSnapshotPurgeRequest(originalOmRequest);
+    OMRequest modifiedOmRequest = omSnapshotPurgeRequest
+        .preExecute(ozoneManager);
+    return new OMSnapshotPurgeRequest(modifiedOmRequest);
+  }
+
+  private void purgeSnapshots(OMRequest snapshotPurgeRequest)
+      throws IOException {
+    // Pre-Execute OMSnapshotPurgeRequest
+    OMSnapshotPurgeRequest omSnapshotPurgeRequest =
+        preExecute(snapshotPurgeRequest);
+
+    // validateAndUpdateCache for OMSnapshotPurgeRequest.
+    OMSnapshotPurgeResponse omSnapshotPurgeResponse = (OMSnapshotPurgeResponse)
+        omSnapshotPurgeRequest.validateAndUpdateCache(ozoneManager, 200L,
+            ozoneManagerDoubleBufferHelper);
+
+    // Commit to DB.
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+    omSnapshotPurgeResponse.checkAndUpdateDB(omMetadataManager, 
batchOperation);
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+  }
+
+  @Test
+  public void testValidateAndUpdateCache() throws Exception {
+
+    List<String> snapshotDbKeysToPurge = createSnapshots(10);
+    Assertions.assertFalse(omMetadataManager.getSnapshotInfoTable().isEmpty());
+    OMRequest snapshotPurgeRequest = createPurgeKeysRequest(
+        snapshotDbKeysToPurge);
+    purgeSnapshots(snapshotPurgeRequest);
+
+    // Check if the entries are deleted.
+    Assertions.assertTrue(omMetadataManager.getSnapshotInfoTable().isEmpty());
+
+    // Check if all the checkpoints are cleared.
+    for (Path checkpoint : checkpointPaths) {
+      Assertions.assertFalse(Files.exists(checkpoint));
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {0, 1, 2, 3, 4})
+  public void testSnapshotChainCleanup(int index) throws Exception {
+    List<String> snapshots = createSnapshots(5);
+    String snapShotToPurge = snapshots.get(index);
+
+    // Before purge, check snapshot chain
+    OmMetadataManagerImpl metadataManager =
+        (OmMetadataManagerImpl) omMetadataManager;
+    SnapshotChainManager chainManager = metadataManager
+        .getSnapshotChainManager();
+    SnapshotInfo snapInfo = metadataManager.getSnapshotInfoTable()
+        .get(snapShotToPurge);
+
+    // Get previous and next snapshotInfos to verify if the SnapInfo
+    // is changed.
+    String prevPathSnapId = null;
+    String prevGlobalSnapId = null;
+    String nextPathSnapId = null;
+    String nextGlobalSnapId = null;
+
+    if (chainManager.hasPreviousPathSnapshot(snapInfo.getSnapshotPath(),
+        snapInfo.getSnapshotID())) {
+      prevPathSnapId = chainManager.previousPathSnapshot(
+          snapInfo.getSnapshotPath(), snapInfo.getSnapshotID());
+    }
+    if (chainManager.hasPreviousGlobalSnapshot(snapInfo.getSnapshotID())) {
+      prevGlobalSnapId = chainManager.previousGlobalSnapshot(
+          snapInfo.getSnapshotID());
+    }
+    if (chainManager.hasNextPathSnapshot(snapInfo.getSnapshotPath(),
+        snapInfo.getSnapshotID())) {
+      nextPathSnapId = chainManager.nextPathSnapshot(
+          snapInfo.getSnapshotPath(), snapInfo.getSnapshotID());
+    }
+    if (chainManager.hasNextGlobalSnapshot(snapInfo.getSnapshotID())) {
+      nextGlobalSnapId = chainManager.nextGlobalSnapshot(
+          snapInfo.getSnapshotID());
+    }
+
+    long rowsInTableBeforePurge = omMetadataManager
+        .countRowsInTable(omMetadataManager.getSnapshotInfoTable());
+    // Purge Snapshot of the given index.
+    List<String> toPurgeList = Collections.singletonList(snapShotToPurge);
+    OMRequest snapshotPurgeRequest = createPurgeKeysRequest(
+        toPurgeList);
+    purgeSnapshots(snapshotPurgeRequest);
+
+    // After purge, check snapshot chain.
+    if (nextPathSnapId != null) {
+      SnapshotInfo nextPathSnapshotInfoAfterPurge = metadataManager
+          
.getSnapshotInfoTable().get(chainManager.getTableKey(nextPathSnapId));
+      Assertions.assertEquals(nextPathSnapshotInfoAfterPurge
+          .getGlobalPreviousSnapshotID(), prevPathSnapId);
+    }
+
+    if (nextGlobalSnapId != null) {
+      SnapshotInfo nextGlobalSnapshotInfoAfterPurge = metadataManager
+          .getSnapshotInfoTable().get(chainManager
+              .getTableKey(nextGlobalSnapId));
+      Assertions.assertEquals(nextGlobalSnapshotInfoAfterPurge
+          .getGlobalPreviousSnapshotID(), prevGlobalSnapId);
+    }
+
+    Assertions.assertNotEquals(rowsInTableBeforePurge, omMetadataManager
+        .countRowsInTable(omMetadataManager.getSnapshotInfoTable()));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to