hemantk-12 commented on code in PR #4244:
URL: https://github.com/apache/ozone/pull/4244#discussion_r1106864946


##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3180,6 +3188,25 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.snapshot.deletion.service.timeout</name>
+    <value>300s</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>
+      Timeout value for SnapshotDeletingService.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.snapshot.deletion.service.interval</name>
+    <value>30s</value>

Review Comment:
   QQ: Does `SnapshotDeletingService` task run at fixed rate or fixed delay?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveDeletedKeysResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyValuePair;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+
+/**
+ * Handles OMSnapshotMoveDeletedKeys Request.
+ */
+public class OMSnapshotMoveDeletedKeysRequest extends OMClientRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotMoveDeletedKeysRequest.class);
+
+  public OMSnapshotMoveDeletedKeysRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
+
+    SnapshotMoveDeletedKeysRequest moveDeletedKeysRequest =
+        getOmRequest().getSnapshotMoveDeletedKeysRequest();
+
+    // If there is no Non-Deleted Snapshot move the
+    // keys to Active Object Store.
+    SnapshotInfo nextSnapshot = null;
+    if (moveDeletedKeysRequest.hasNextSnapshot()) {
+      nextSnapshot = SnapshotInfo
+          .getFromProtobuf(moveDeletedKeysRequest.getNextSnapshot());
+    }
+
+    List<KeyValuePair> activeDBKeysList =
+        moveDeletedKeysRequest.getActiveDBKeysList();
+    List<KeyValuePair> nextDBKeysList =
+        moveDeletedKeysRequest.getNextDBKeysList();
+
+    OmSnapshot omFromSnapshot = null;
+    OmSnapshot omNextSnapshot = null;
+
+    try {

Review Comment:
   nit:
   ```suggestion
       if (nextSnapshot != null) {
         try {
           omNextSnapshot = (OmSnapshot) omSnapshotManager
               .checkForSnapshot(nextSnapshot.getVolumeName(),
                   nextSnapshot.getBucketName(),
                   getSnapshotPrefix(nextSnapshot.getName()));
         } catch (IOException ex) {
           LOG.error("Error occurred when moving keys between snapshots", ex);
         }
       }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java:
##########
@@ -49,6 +49,7 @@ public class SnapshotChainManager {
       snapshotChainPath;
   private Map<String, String> latestPathSnapshotID;
   private String latestGlobalSnapshotID;
+  private Map<String, String> snapshotPathToKey;

Review Comment:
   What is key here? It would be better if you specify that in variable name.



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDeletingService.class);
+
+  private KeyManager keyManager;
+  private OMMetadataManager omMetadataManager;
+  private OzoneConfiguration conf;
+  private OmTestManagers omTestManagers;
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME = "bucket1";
+
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  @Before
+  public void createConfAndInitValues() throws Exception {
+    conf = new OzoneConfiguration();
+    File testDir = PathUtils.getTestDir(TestSnapshotDeletingService.class);
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, testDir.getPath());
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL,
+        1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT,
+        100000, TimeUnit.MILLISECONDS);
+    conf.setQuietMode(false);
+    omTestManagers = new OmTestManagers(conf);
+    keyManager = omTestManagers.getKeyManager();
+    omMetadataManager = omTestManagers.getMetadataManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  @Test
+  public void testSnapshotKeySpaceReclaim() throws Exception {
+    int snapshotCount = 0;
+
+    SnapshotDeletingService snapshotDeletingService = (SnapshotDeletingService)
+        keyManager.getSnapshotDeletingService();
+
+    snapshotDeletingService.suspend();
+
+    OmKeyArgs key1 = createVolumeBucketKey(VOLUME_NAME, BUCKET_NAME,
+        BucketLayout.DEFAULT, "key1");
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap1", ++snapshotCount);
+
+    OmKeyArgs key2 = createKey(VOLUME_NAME, BUCKET_NAME, "key2");
+
+    // Key 1 cannot be deleted as it is still referenced by Snapshot 1.
+    writeClient.deleteKey(key1);
+    //Key 2 is deleted here, which means we can reclaim
+    // it when snapshot 2 is deleted.
+    writeClient.deleteKey(key2);
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap2", ++snapshotCount);
+    createKey(VOLUME_NAME, BUCKET_NAME, "key4");
+    OmKeyArgs key5 = createKey(VOLUME_NAME, BUCKET_NAME, "key5");
+    writeClient.deleteKey(key5);
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap3", ++snapshotCount);
+
+

Review Comment:
   Please remove unnecessary line.



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3165,6 +3165,14 @@
       sst filtering service per time interval.
     </description>
   </property>
+  <property>
+    <name>ozone.snapshot.deleting.limit.per.task</name>
+    <value>10</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>The number of snapshots to be reclaimed by the

Review Comment:
   ```suggestion
       <description>The number of maximum snapshots to be reclaimed by the
         Snapshot Deleting Service per run.
    ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java:
##########
@@ -91,8 +93,8 @@ private void addSnapshotGlobal(String snapshotID,
   };
 
   private void addSnapshotPath(String snapshotPath,
-                               String snapshotID,
-                               String prevPathID) throws IOException {
+      String snapshotID, String prevPathID, String snapTableKey)

Review Comment:
   Also alignment is off. Previous alignment was correct.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyValuePair;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+
+/**
+ * Response for OMSnapshotMoveDeletedKeysRequest.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotMoveDeletedKeysResponse extends OMClientResponse {
+
+  private OmSnapshot nextSnapshot;
+  private List<KeyValuePair> activeDBKeysList;
+  private List<KeyValuePair> nextDBKeysList;
+
+  public OMSnapshotMoveDeletedKeysResponse(OMResponse omResponse,
+       OmSnapshot omNextSnapshot, List<KeyValuePair> activeDBKeysList,
+       List<KeyValuePair> nextDBKeysList) {
+    super(omResponse);
+    this.nextSnapshot = omNextSnapshot;
+    this.activeDBKeysList = activeDBKeysList;
+    this.nextDBKeysList = nextDBKeysList;
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    for (KeyValuePair activeDBKey : activeDBKeysList) {
+      RepeatedOmKeyInfo activeDBOmKeyInfo =
+          createRepeatedOmKeyInfo(activeDBKey.getKeyInfosList());
+
+      if (activeDBOmKeyInfo == null) {
+        continue;
+      }
+
+      omMetadataManager.getDeletedTable().putWithBatch(
+          batchOperation, activeDBKey.getKey(), activeDBOmKeyInfo);
+    }
+
+    for (KeyValuePair nextDBKey : nextDBKeysList) {
+      RepeatedOmKeyInfo nextDBOmKeyInfo =
+          createRepeatedOmKeyInfo(nextDBKey.getKeyInfosList());
+
+      if (nextDBOmKeyInfo == null) {
+        continue;
+      }
+
+      if (nextSnapshot != null) {
+        nextSnapshot.getMetadataManager()
+            .getDeletedTable().putWithBatch(batchOperation,
+                nextDBKey.getKey(), nextDBOmKeyInfo);
+      } else {
+        omMetadataManager.getDeletedTable()
+            .put(nextDBKey.getKey(), nextDBOmKeyInfo);
+      }
+    }
+  }
+
+  private RepeatedOmKeyInfo createRepeatedOmKeyInfo(List<KeyInfo> keyInfosList)

Review Comment:
   I think it should be either `keyInfoList` or `keyInfos`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java:
##########
@@ -146,6 +149,26 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
               .getLatestSequenceNumber();
       snapshotInfo.setDbTxSequenceNumber(dbLatestSequenceNumber);
 
+      // Set previous path and global snapshot
+      String latestPathSnapshot =
+          snapshotChainManager.getLatestPathSnapshot(snapshotPath);
+      String latestGlobalSnapshot =
+          snapshotChainManager.getLatestGlobalSnapshot();
+
+      if (latestPathSnapshot == null || latestPathSnapshot.isEmpty()) {

Review Comment:
   nit:
   ```suggestion
         if (StringUtils.isEmpty(latestPathSnapshot)) {
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyValuePair;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+
+/**
+ * Response for OMSnapshotMoveDeletedKeysRequest.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotMoveDeletedKeysResponse extends OMClientResponse {
+
+  private OmSnapshot nextSnapshot;
+  private List<KeyValuePair> activeDBKeysList;
+  private List<KeyValuePair> nextDBKeysList;
+
+  public OMSnapshotMoveDeletedKeysResponse(OMResponse omResponse,
+       OmSnapshot omNextSnapshot, List<KeyValuePair> activeDBKeysList,
+       List<KeyValuePair> nextDBKeysList) {
+    super(omResponse);
+    this.nextSnapshot = omNextSnapshot;
+    this.activeDBKeysList = activeDBKeysList;
+    this.nextDBKeysList = nextDBKeysList;
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    for (KeyValuePair activeDBKey : activeDBKeysList) {
+      RepeatedOmKeyInfo activeDBOmKeyInfo =
+          createRepeatedOmKeyInfo(activeDBKey.getKeyInfosList());
+
+      if (activeDBOmKeyInfo == null) {
+        continue;
+      }
+
+      omMetadataManager.getDeletedTable().putWithBatch(
+          batchOperation, activeDBKey.getKey(), activeDBOmKeyInfo);
+    }
+
+    for (KeyValuePair nextDBKey : nextDBKeysList) {
+      RepeatedOmKeyInfo nextDBOmKeyInfo =
+          createRepeatedOmKeyInfo(nextDBKey.getKeyInfosList());
+
+      if (nextDBOmKeyInfo == null) {
+        continue;
+      }
+
+      if (nextSnapshot != null) {
+        nextSnapshot.getMetadataManager()
+            .getDeletedTable().putWithBatch(batchOperation,
+                nextDBKey.getKey(), nextDBOmKeyInfo);
+      } else {
+        omMetadataManager.getDeletedTable()
+            .put(nextDBKey.getKey(), nextDBOmKeyInfo);
+      }
+    }
+  }
+
+  private RepeatedOmKeyInfo createRepeatedOmKeyInfo(List<KeyInfo> keyInfosList)
+      throws IOException {
+    RepeatedOmKeyInfo result = null;
+
+    for (KeyInfo keyInfo: keyInfosList) {
+      if (result == null) {
+        result = new RepeatedOmKeyInfo(OmKeyInfo.getFromProtobuf(keyInfo));
+      } else {
+        result.addOmKeyInfo(OmKeyInfo.getFromProtobuf(keyInfo));
+      }
+    }
+
+    return result;
+  }
+}

Review Comment:
   Please add a new line at the end of the file.
   
   You can configure IDE to add that on saving of the file. 
   
   In IntelliJ: 
https://stackoverflow.com/questions/16761227/how-to-make-intellij-idea-insert-a-new-line-at-every-end-of-file



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =

Review Comment:
   Unused declaration.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotMoveDeletedKeysRequest.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveDeletedKeysResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyValuePair;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+
+/**
+ * Handles OMSnapshotMoveDeletedKeys Request.
+ */
+public class OMSnapshotMoveDeletedKeysRequest extends OMClientRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotMoveDeletedKeysRequest.class);
+
+  public OMSnapshotMoveDeletedKeysRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
+
+    SnapshotMoveDeletedKeysRequest moveDeletedKeysRequest =
+        getOmRequest().getSnapshotMoveDeletedKeysRequest();
+
+    // If there is no Non-Deleted Snapshot move the
+    // keys to Active Object Store.
+    SnapshotInfo nextSnapshot = null;
+    if (moveDeletedKeysRequest.hasNextSnapshot()) {
+      nextSnapshot = SnapshotInfo
+          .getFromProtobuf(moveDeletedKeysRequest.getNextSnapshot());
+    }
+
+    List<KeyValuePair> activeDBKeysList =
+        moveDeletedKeysRequest.getActiveDBKeysList();
+    List<KeyValuePair> nextDBKeysList =
+        moveDeletedKeysRequest.getNextDBKeysList();
+
+    OmSnapshot omFromSnapshot = null;
+    OmSnapshot omNextSnapshot = null;
+
+    try {
+      if (nextSnapshot != null) {
+        omNextSnapshot = (OmSnapshot) omSnapshotManager
+            .checkForSnapshot(nextSnapshot.getVolumeName(),
+                nextSnapshot.getBucketName(),
+                getSnapshotPrefix(nextSnapshot.getName()));
+      }
+    } catch (IOException ex) {
+      LOG.error("Error occurred when moving keys between snapshots", ex);
+    }
+
+    OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(

Review Comment:
   Could be a one line
   ```suggestion
           OmResponseUtil.getOMResponseBuilder(getOmRequest());
    ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyValuePair;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+
+/**
+ * Background Service to clean-up deleted snapshot and reclaim space.
+ */
+public class SnapshotDeletingService extends BackgroundService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDeletingService.class);
+
+  // Use only a single thread for Snapshot Deletion. Multiple threads would 
read
+  // from the same table and can send deletion requests for same snapshot
+  // multiple times.
+  private static final int SNAPSHOT_DELETING_CORE_POOL_SIZE = 1;
+  private final ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  private final OzoneManager ozoneManager;
+  private final OmSnapshotManager omSnapshotManager;
+  private final SnapshotChainManager chainManager;
+  private final AtomicBoolean suspended;
+  private final OzoneConfiguration conf;
+  private final AtomicLong successRunCount;
+  private final long snapshotDeletionPerTask;
+
+  public SnapshotDeletingService(long interval, long serviceTimeout,
+      OzoneManager ozoneManager) throws IOException {
+    super("SnapshotDeletingService", interval, TimeUnit.MILLISECONDS,
+        SNAPSHOT_DELETING_CORE_POOL_SIZE, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.omSnapshotManager = ozoneManager.getOmSnapshotManager();
+    this.chainManager = ozoneManager.getSnapshotChainManager();
+    this.runCount = new AtomicLong(0);
+    this.successRunCount = new AtomicLong(0);
+    this.suspended = new AtomicBoolean(false);
+    this.conf = ozoneManager.getConfiguration();
+    this.snapshotDeletionPerTask = conf
+        .getLong(SNAPSHOT_DELETING_LIMIT_PER_TASK,
+        SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT);
+  }
+
+  private class SnapshotDeletingTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      if (!shouldRun()) {
+        return BackgroundTaskResult.EmptyTaskResult.newResult();
+      }
+
+      runCount.incrementAndGet();
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (TableIterator<String, ? extends Table.KeyValue
+          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotDeletionPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {
+          SnapshotInfo snapInfo = iterator.next().getValue();
+          SnapshotInfo.SnapshotStatus snapshotStatus =
+              snapInfo.getSnapshotStatus();
+
+          // Only Iterate in deleted snapshot
+          if (!snapshotStatus.equals(
+              SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED)) {
+            continue;
+          }
+
+          OmSnapshot omSnapshot = (OmSnapshot) omSnapshotManager
+              .checkForSnapshot(snapInfo.getVolumeName(),
+                  snapInfo.getBucketName(),
+                  getSnapshotPrefix(snapInfo.getName()));
+
+          Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
+              omSnapshot.getMetadataManager().getDeletedTable();
+
+          if (snapshotDeletedTable.getEstimatedKeyCount() == 0) {
+            continue;
+          }
+
+          // Get bucketInfo for the snapshot bucket to get bucket layout.
+          String dbBucketKey = ozoneManager.getMetadataManager().getBucketKey(
+              snapInfo.getVolumeName(), snapInfo.getBucketName());
+          OmBucketInfo bucketInfo = ozoneManager.getMetadataManager()
+              .getBucketTable().get(dbBucketKey);
+
+
+          chainManager.loadFromSnapshotInfoTable(
+              ozoneManager.getMetadataManager());
+          SnapshotInfo nextSnapshot = getNextNonDeletedSnapshot(snapInfo);
+
+          SnapshotInfo previousSnapshot = getPreviousSnapshot(snapInfo);
+          Table<String, OmKeyInfo> previousKeyTable = null;
+          OmSnapshot omPreviousSnapshot = null;
+
+          // Handle case when the deleted snapshot is the first snapshot.
+          // Move deleted keys to activeDB's deletedKeyTable
+          if (previousSnapshot != null) {
+            omPreviousSnapshot = (OmSnapshot) omSnapshotManager
+                .checkForSnapshot(previousSnapshot.getVolumeName(),
+                    previousSnapshot.getBucketName(),
+                    getSnapshotPrefix(previousSnapshot.getName()));
+
+            previousKeyTable = omPreviousSnapshot
+                
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
+          }
+
+          // Move key to either next non deleted snapshot's 
snapshotDeletedTable
+          // or move to active object store deleted table
+
+          List<KeyValuePair> toActiveDBList = new ArrayList<>();
+          List<KeyValuePair> toNextDBList = new ArrayList<>();
+
+          try (TableIterator<String, ? extends Table.KeyValue<String,
+              RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable
+              .iterator()) {
+
+            iterator.seekToFirst();
+
+            while (deletedIterator.hasNext()) {
+              Table.KeyValue<String, RepeatedOmKeyInfo>
+                  deletedKeyValue = deletedIterator.next();
+              RepeatedOmKeyInfo repeatedOmKeyInfo = deletedKeyValue.getValue();
+
+              KeyValuePair.Builder toActiveDb = KeyValuePair.newBuilder()
+                  .setKey(deletedKeyValue.getKey());
+              KeyValuePair.Builder toNextDb = KeyValuePair.newBuilder()
+                  .setKey(deletedKeyValue.getKey());
+
+              for (OmKeyInfo keyInfo: repeatedOmKeyInfo.getOmKeyInfoList()) {
+                splitRepeatedOmKeyInfo(toActiveDb, toNextDb,
+                    keyInfo, previousKeyTable);
+              }
+
+              toActiveDBList.add(toActiveDb.build());
+              toNextDBList.add(toNextDb.build());
+
+            }
+            // Submit Move request to OM.
+            submitSnapshotMoveDeletedKeys(snapInfo, nextSnapshot,
+                toActiveDBList, toNextDBList);
+            snapshotLimit--;
+          } catch (IOException ex) {
+            LOG.error("Error while running Snapshot Deleting Service", ex);
+          }
+        }
+
+        successRunCount.incrementAndGet();
+      } catch (IOException e) {
+        LOG.error("Error while running Snapshot Deleting Service", e);
+      }
+
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    private void splitRepeatedOmKeyInfo(KeyValuePair.Builder toActiveDb,
+        KeyValuePair.Builder toNextDb, OmKeyInfo keyInfo,
+        Table<String, OmKeyInfo> previousKeyTable) throws IOException {
+      if (checkKeyExistInPreviousTable(previousKeyTable, keyInfo)) {
+        // Move to next non deleted snapshot's deleted table
+        toNextDb.addKeyInfos(keyInfo.getProtobuf(
+            ClientVersion.CURRENT_VERSION));
+      } else {
+        // Move to active DB Deleted Table.
+        toActiveDb.addKeyInfos(keyInfo
+            .getProtobuf(ClientVersion.CURRENT_VERSION));
+      }
+    }
+
+    private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
+        SnapshotInfo nextSnapshot, List<KeyValuePair> toActiveDBList,
+        List<KeyValuePair> toNextDBList) {
+
+      SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder =
+          SnapshotMoveDeletedKeysRequest.newBuilder()
+              .setFromSnapshot(snapInfo.getProtobuf());
+
+      if (nextSnapshot != null) {
+        moveDeletedKeysBuilder.setNextSnapshot(nextSnapshot.getProtobuf());
+      }
+
+      SnapshotMoveDeletedKeysRequest moveDeletedKeys =
+          moveDeletedKeysBuilder.addAllActiveDBKeys(toActiveDBList)
+          .addAllNextDBKeys(toNextDBList).build();
+
+
+      OMRequest omRequest = OMRequest.newBuilder()
+          .setCmdType(Type.SnapshotMoveDeletedKeys)
+          .setSnapshotMoveDeletedKeysRequest(moveDeletedKeys)
+          .setClientId(clientId.toString())
+          .build();
+
+      submitRequest(omRequest);
+    }
+
+    private boolean checkKeyExistInPreviousTable(
+        Table<String, OmKeyInfo> previousKeyTable, OmKeyInfo deletedKeyInfo)
+        throws IOException {
+
+      if (previousKeyTable == null) {
+        return false;
+      }
+
+      //TODO: Handle Renamed Keys
+      String dbKey = ozoneManager.getMetadataManager()
+          .getOzoneKey(deletedKeyInfo.getVolumeName(),
+              deletedKeyInfo.getBucketName(), deletedKeyInfo.getKeyName());
+
+      OmKeyInfo prevKeyInfo = previousKeyTable.get(dbKey);
+      if (prevKeyInfo != null &&
+          prevKeyInfo.getObjectID() == deletedKeyInfo.getObjectID()) {
+        return true;
+      }
+      return false;
+    }
+
+    private SnapshotInfo getPreviousSnapshot(SnapshotInfo snapInfo)
+        throws IOException {
+      if (chainManager.hasPreviousPathSnapshot(snapInfo.getSnapshotPath(),
+          snapInfo.getSnapshotID())) {
+        String previousPathSnapshot = chainManager.previousPathSnapshot(
+            snapInfo.getSnapshotPath(), snapInfo.getSnapshotID());
+        String tableKey = chainManager.getTableKey(previousPathSnapshot);
+        return omSnapshotManager.getSnapshotInfo(tableKey);
+      }
+      return null;
+    }
+
+    /**
+     * Get the next non deleted snapshot in the snapshot chain.
+     */
+    private SnapshotInfo getNextNonDeletedSnapshot(SnapshotInfo snapInfo)
+        throws IOException {
+      while (chainManager.hasNextPathSnapshot(snapInfo.getSnapshotPath(), 
+          snapInfo.getSnapshotID())) {
+
+        String nextPathSnapshot =
+            chainManager.nextPathSnapshot(
+                snapInfo.getSnapshotPath(), snapInfo.getSnapshotID());
+
+        String tableKey = chainManager.getTableKey(nextPathSnapshot);
+        SnapshotInfo nextSnapshotInfo =
+            omSnapshotManager.getSnapshotInfo(tableKey);
+
+        if (nextSnapshotInfo.getSnapshotStatus().equals(
+            SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE)) {
+          return nextSnapshotInfo;
+        }
+      }
+      return null;
+    }
+
+    private void submitRequest(OMRequest omRequest) {
+      try {
+        if (isRatisEnabled()) {
+          OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
+
+          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+              .setClientId(clientId)
+              .setServerId(server.getRaftPeerId())
+              .setGroupId(server.getRaftGroupId())
+              .setCallId(runCount.get())
+              .setMessage(Message.valueOf(
+                  OMRatisHelper.convertRequestToByteString(omRequest)))
+              .setType(RaftClientRequest.writeRequestType())
+              .build();
+
+          server.submitRequest(omRequest, raftClientRequest);
+        } else {
+          ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
+        }
+      } catch (ServiceException e) {
+        LOG.error("Snapshot Deleting request failed. " +
+            "Will retry at next run.", e);
+      }
+    }
+
+    private boolean isRatisEnabled() {
+      return ozoneManager.isRatisEnabled();
+    }
+
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new SnapshotDeletingTask());
+    return queue;
+  }
+
+  private boolean shouldRun() {
+    return !suspended.get() && ozoneManager.isLeaderReady();
+  }
+
+  /**
+   * Suspend the service (for testing).
+   */
+  @VisibleForTesting

Review Comment:
   1. Is it just for testing or we will need it for `Bootstrapping slow 
followers/new followers.`?
   1. I don't think it is needed to specify `(for testing)` in comment since 
you already add `VisibleForTesting`.
   1. I think it should be package private if it is only for testing.
   



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+  @Rule

Review Comment:
   nit: since this is new code, I'll suggest to use Unit-5. 



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDeletingService.class);
+
+  private KeyManager keyManager;
+  private OMMetadataManager omMetadataManager;
+  private OzoneConfiguration conf;
+  private OmTestManagers omTestManagers;
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME = "bucket1";
+
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  @Before
+  public void createConfAndInitValues() throws Exception {
+    conf = new OzoneConfiguration();
+    File testDir = PathUtils.getTestDir(TestSnapshotDeletingService.class);
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, testDir.getPath());
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL,
+        1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT,
+        100000, TimeUnit.MILLISECONDS);
+    conf.setQuietMode(false);
+    omTestManagers = new OmTestManagers(conf);
+    keyManager = omTestManagers.getKeyManager();
+    omMetadataManager = omTestManagers.getMetadataManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();

Review Comment:
   ```suggestion
       if (om != null) {
         om.stop();
       }
   ```
   
   Should this be enclosed in `if`?



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDeletingService.class);
+
+  private KeyManager keyManager;
+  private OMMetadataManager omMetadataManager;
+  private OzoneConfiguration conf;
+  private OmTestManagers omTestManagers;
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME = "bucket1";
+
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  @Before
+  public void createConfAndInitValues() throws Exception {
+    conf = new OzoneConfiguration();
+    File testDir = PathUtils.getTestDir(TestSnapshotDeletingService.class);
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, testDir.getPath());
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL,
+        1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT,
+        100000, TimeUnit.MILLISECONDS);
+    conf.setQuietMode(false);
+    omTestManagers = new OmTestManagers(conf);
+    keyManager = omTestManagers.getKeyManager();
+    omMetadataManager = omTestManagers.getMetadataManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  @Test
+  public void testSnapshotKeySpaceReclaim() throws Exception {
+    int snapshotCount = 0;
+
+    SnapshotDeletingService snapshotDeletingService = (SnapshotDeletingService)
+        keyManager.getSnapshotDeletingService();
+
+    snapshotDeletingService.suspend();
+
+    OmKeyArgs key1 = createVolumeBucketKey(VOLUME_NAME, BUCKET_NAME,
+        BucketLayout.DEFAULT, "key1");
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap1", ++snapshotCount);
+
+    OmKeyArgs key2 = createKey(VOLUME_NAME, BUCKET_NAME, "key2");
+
+    // Key 1 cannot be deleted as it is still referenced by Snapshot 1.
+    writeClient.deleteKey(key1);
+    //Key 2 is deleted here, which means we can reclaim
+    // it when snapshot 2 is deleted.
+    writeClient.deleteKey(key2);
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap2", ++snapshotCount);
+    createKey(VOLUME_NAME, BUCKET_NAME, "key4");
+    OmKeyArgs key5 = createKey(VOLUME_NAME, BUCKET_NAME, "key5");
+    writeClient.deleteKey(key5);
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap3", ++snapshotCount);
+
+
+    String snapshotKey2 = "/vol1/bucket1/snap2";
+    SnapshotInfo snapshotInfo = om.getMetadataManager()
+        .getSnapshotInfoTable().get(snapshotKey2);
+
+    snapshotInfo
+        .setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
+    om.getMetadataManager()
+        .getSnapshotInfoTable().put(snapshotKey2, snapshotInfo);
+    snapshotInfo = om.getMetadataManager()
+        .getSnapshotInfoTable().get(snapshotKey2);
+    assertEquals(snapshotInfo.getSnapshotStatus(),
+        SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
+
+    snapshotDeletingService.resume();
+
+    GenericTestUtils.waitFor(() ->
+            snapshotDeletingService.getSuccessfulRunCount() >= 1,
+        1000, 10000);
+
+    OmSnapshot nextSnapshot = (OmSnapshot) om.getOmSnapshotManager()
+        .checkForSnapshot(VOLUME_NAME, BUCKET_NAME, 
getSnapshotPrefix("snap3"));
+
+    //Check key1 added to next non deleted snapshot db.
+    RepeatedOmKeyInfo omKeyInfo =
+        nextSnapshot.getMetadataManager()
+            .getDeletedTable().get("/vol1/bucket1/key1");
+    assertNotNull(omKeyInfo);
+
+    //Check key2 added active db as it can be reclaimed.
+    RepeatedOmKeyInfo omKeyInfo1 = omMetadataManager
+        .getDeletedTable().get("/vol1/bucket1/key2");
+
+    assertNotNull(omKeyInfo1);
+
+  }
+
+  private OmKeyArgs createVolumeBucketKey(String volumeName, String bucketName,
+      BucketLayout bucketLayout, String keyName) throws IOException {
+    // cheat here, just create a volume and bucket entry so that we can
+    // create the keys, we put the same data for key and value since the
+    // system does not decode the object
+    OMRequestTestUtils.addVolumeToOM(omMetadataManager,
+        OmVolumeArgs.newBuilder()
+            .setOwnerName("owner")
+            .setAdminName("admin")
+            .setVolume(volumeName)
+            .build());
+
+    OMRequestTestUtils.addBucketToOM(omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(bucketLayout)
+            .build());
+
+    return createKey(volumeName, bucketName, keyName);
+  }
+
+  private OmKeyArgs createKey(String volumeName, String bucketName,
+       String keyName) throws IOException {
+    OmKeyArgs keyArg =
+        new OmKeyArgs.Builder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .setAcls(Collections.emptyList())
+            .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+                HddsProtos.ReplicationFactor.ONE))
+            .setLocationInfoList(new ArrayList<>())
+            .build();
+
+    // Open and write the key without commit it.
+    OpenKeySession session = writeClient.openKey(keyArg);
+    writeClient.commitKey(keyArg, session.getId());
+
+    return keyArg;
+  }
+
+  private void createSnapshot(String volName, String bucketName,
+       String snapName, int count) throws Exception {
+    writeClient.createSnapshot(volName, bucketName, snapName);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return omMetadataManager.getSnapshotInfoTable()
+            .getEstimatedKeyCount() >= count;

Review Comment:
   Curious if `EstimatedKeyCount()` includes cache? What if cache has item but 
it hasn't been flush to SSTs? Would it cause uncertainty? Other places we make 
sure that snapshot dir exists. 



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDeletingService.class);
+
+  private KeyManager keyManager;
+  private OMMetadataManager omMetadataManager;
+  private OzoneConfiguration conf;
+  private OmTestManagers omTestManagers;
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME = "bucket1";
+
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  @Before
+  public void createConfAndInitValues() throws Exception {
+    conf = new OzoneConfiguration();
+    File testDir = PathUtils.getTestDir(TestSnapshotDeletingService.class);
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, testDir.getPath());
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL,
+        1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT,
+        100000, TimeUnit.MILLISECONDS);
+    conf.setQuietMode(false);
+    omTestManagers = new OmTestManagers(conf);
+    keyManager = omTestManagers.getKeyManager();
+    omMetadataManager = omTestManagers.getMetadataManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  @Test
+  public void testSnapshotKeySpaceReclaim() throws Exception {
+    int snapshotCount = 0;
+
+    SnapshotDeletingService snapshotDeletingService = (SnapshotDeletingService)
+        keyManager.getSnapshotDeletingService();
+
+    snapshotDeletingService.suspend();
+
+    OmKeyArgs key1 = createVolumeBucketKey(VOLUME_NAME, BUCKET_NAME,
+        BucketLayout.DEFAULT, "key1");
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap1", ++snapshotCount);
+
+    OmKeyArgs key2 = createKey(VOLUME_NAME, BUCKET_NAME, "key2");
+
+    // Key 1 cannot be deleted as it is still referenced by Snapshot 1.
+    writeClient.deleteKey(key1);
+    //Key 2 is deleted here, which means we can reclaim
+    // it when snapshot 2 is deleted.
+    writeClient.deleteKey(key2);
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap2", ++snapshotCount);
+    createKey(VOLUME_NAME, BUCKET_NAME, "key4");
+    OmKeyArgs key5 = createKey(VOLUME_NAME, BUCKET_NAME, "key5");
+    writeClient.deleteKey(key5);
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap3", ++snapshotCount);
+
+
+    String snapshotKey2 = "/vol1/bucket1/snap2";
+    SnapshotInfo snapshotInfo = om.getMetadataManager()
+        .getSnapshotInfoTable().get(snapshotKey2);
+
+    snapshotInfo
+        .setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
+    om.getMetadataManager()
+        .getSnapshotInfoTable().put(snapshotKey2, snapshotInfo);
+    snapshotInfo = om.getMetadataManager()
+        .getSnapshotInfoTable().get(snapshotKey2);
+    assertEquals(snapshotInfo.getSnapshotStatus(),
+        SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED);
+
+    snapshotDeletingService.resume();
+
+    GenericTestUtils.waitFor(() ->

Review Comment:
   nit: you can also use 
[Awaitility](https://github.com/awaitility/awaitility/wiki/Usage#simple).



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Snapshot Deleting Service.
+ */
+public class TestSnapshotDeletingService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDeletingService.class);
+
+  private KeyManager keyManager;
+  private OMMetadataManager omMetadataManager;
+  private OzoneConfiguration conf;
+  private OmTestManagers omTestManagers;
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME = "bucket1";
+
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  @Before
+  public void createConfAndInitValues() throws Exception {
+    conf = new OzoneConfiguration();
+    File testDir = PathUtils.getTestDir(TestSnapshotDeletingService.class);
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, testDir.getPath());
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_INTERVAL,
+        1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_DELETION_SERVICE_TIMEOUT,
+        100000, TimeUnit.MILLISECONDS);
+    conf.setQuietMode(false);
+    omTestManagers = new OmTestManagers(conf);
+    keyManager = omTestManagers.getKeyManager();
+    omMetadataManager = omTestManagers.getMetadataManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  @Test
+  public void testSnapshotKeySpaceReclaim() throws Exception {
+    int snapshotCount = 0;
+
+    SnapshotDeletingService snapshotDeletingService = (SnapshotDeletingService)
+        keyManager.getSnapshotDeletingService();
+
+    snapshotDeletingService.suspend();
+
+    OmKeyArgs key1 = createVolumeBucketKey(VOLUME_NAME, BUCKET_NAME,
+        BucketLayout.DEFAULT, "key1");
+
+    createSnapshot(VOLUME_NAME, BUCKET_NAME, "snap1", ++snapshotCount);
+
+    OmKeyArgs key2 = createKey(VOLUME_NAME, BUCKET_NAME, "key2");
+
+    // Key 1 cannot be deleted as it is still referenced by Snapshot 1.
+    writeClient.deleteKey(key1);
+    //Key 2 is deleted here, which means we can reclaim

Review Comment:
   nit: Space after `//`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java:
##########
@@ -146,6 +149,26 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
               .getLatestSequenceNumber();
       snapshotInfo.setDbTxSequenceNumber(dbLatestSequenceNumber);
 
+      // Set previous path and global snapshot
+      String latestPathSnapshot =
+          snapshotChainManager.getLatestPathSnapshot(snapshotPath);
+      String latestGlobalSnapshot =
+          snapshotChainManager.getLatestGlobalSnapshot();
+
+      if (latestPathSnapshot == null || latestPathSnapshot.isEmpty()) {
+        snapshotInfo.setPathPreviousSnapshotID("");
+      } else {
+        snapshotInfo.setPathPreviousSnapshotID(latestPathSnapshot);
+      }
+
+      if (latestGlobalSnapshot == null || latestGlobalSnapshot.isEmpty()) {
+        snapshotInfo.setGlobalPreviousSnapshotID("");
+      } else {
+        snapshotInfo.setGlobalPreviousSnapshotID(latestGlobalSnapshot);
+      }

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to