hemantk-12 commented on code in PR #7200:
URL: https://github.com/apache/ozone/pull/7200#discussion_r1764103757


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.createMergedRepeatedOmKeyInfoFromDeletedTableEntry;
+
+/**
+ * Response for OMSnapshotMoveDeletedKeysRequest.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotMoveTableKeysResponse extends OMClientResponse {
+
+  private SnapshotInfo fromSnapshot;
+  private SnapshotInfo nextSnapshot;
+  private List<SnapshotMoveKeyInfos> deletedKeys;
+  private List<HddsProtos.KeyValue> renamedKeysList;
+  private List<SnapshotMoveKeyInfos> deletedDirs;
+
+  public OMSnapshotMoveTableKeysResponse(OMResponse omResponse,
+                                         @Nonnull SnapshotInfo fromSnapshot, 
SnapshotInfo nextSnapshot,
+                                         List<SnapshotMoveKeyInfos> 
deletedKeys,
+                                         List<SnapshotMoveKeyInfos> 
deletedDirs,
+                                         List<HddsProtos.KeyValue> 
renamedKeys) {
+    super(omResponse);
+    this.fromSnapshot = fromSnapshot;
+    this.nextSnapshot = nextSnapshot;
+    this.deletedKeys = deletedKeys;
+    this.renamedKeysList = renamedKeys;
+    this.deletedDirs = deletedDirs;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMSnapshotMoveTableKeysResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager, 
BatchOperation batchOperation) throws IOException {
+
+    // Note: a trick to get
+    // To do this properly, refactor OzoneManagerDoubleBuffer#addToBatch and
+    // add OmSnapshotManager as a parameter.
+    OmSnapshotManager omSnapshotManager = ((OmMetadataManagerImpl) 
omMetadataManager)
+        .getOzoneManager().getOmSnapshotManager();
+
+    try (ReferenceCounted<OmSnapshot> rcOmFromSnapshot =
+             omSnapshotManager.getSnapshot(fromSnapshot.getSnapshotId())) {
+
+      OmSnapshot fromOmSnapshot = rcOmFromSnapshot.get();
+
+      if (nextSnapshot != null) {
+        try (ReferenceCounted<OmSnapshot>
+            rcOmNextSnapshot = 
omSnapshotManager.getSnapshot(nextSnapshot.getSnapshotId())) {
+
+          OmSnapshot nextOmSnapshot = rcOmNextSnapshot.get();
+          RDBStore nextSnapshotStore = (RDBStore) 
nextOmSnapshot.getMetadataManager().getStore();
+          // Init Batch Operation for snapshot db.
+          try (BatchOperation writeBatch = 
nextSnapshotStore.initBatchOperation()) {
+            addKeysToNextSnapshot(writeBatch, 
nextOmSnapshot.getMetadataManager());
+            nextSnapshotStore.commitBatchOperation(writeBatch);
+            nextSnapshotStore.getDb().flushWal(true);
+            nextSnapshotStore.getDb().flush();
+          }
+        }
+      } else {
+        // Handle the case where there is no next Snapshot.
+        addKeysToNextSnapshot(batchOperation, omMetadataManager);
+      }
+
+      // Update From Snapshot Deleted Table.
+      RDBStore fromSnapshotStore = (RDBStore) 
fromOmSnapshot.getMetadataManager().getStore();
+      try (BatchOperation fromSnapshotBatchOp = 
fromSnapshotStore.initBatchOperation()) {
+        deleteKeysFromSnapshot(fromSnapshotBatchOp, 
fromOmSnapshot.getMetadataManager());
+        fromSnapshotStore.commitBatchOperation(fromSnapshotBatchOp);
+        fromSnapshotStore.getDb().flushWal(true);
+        fromSnapshotStore.getDb().flush();
+      }
+    }
+
+    // Flush snapshot info to rocksDB.
+    omMetadataManager.getSnapshotInfoTable().putWithBatch(batchOperation, 
fromSnapshot.getTableKey(), fromSnapshot);
+    if (nextSnapshot != null) {
+      omMetadataManager.getSnapshotInfoTable().putWithBatch(batchOperation, 
nextSnapshot.getTableKey(), nextSnapshot);
+    }
+  }
+
+  private void deleteKeysFromSnapshot(BatchOperation batchOp, 
OMMetadataManager fromSnapshotMetadataManager)
+      throws IOException {
+    for (SnapshotMoveKeyInfos deletedKey : deletedKeys) {

Review Comment:
   Suggestion: If we are running this sequentially, we can also use 
[deleteRange](https://github.com/apache/ozone/blob/master/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java#L158).
 You can create a jira or add TODO if wanna do it later.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.createMergedRepeatedOmKeyInfoFromDeletedTableEntry;
+
+/**
+ * Response for OMSnapshotMoveDeletedKeysRequest.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotMoveTableKeysResponse extends OMClientResponse {
+
+  private SnapshotInfo fromSnapshot;
+  private SnapshotInfo nextSnapshot;
+  private List<SnapshotMoveKeyInfos> deletedKeys;
+  private List<HddsProtos.KeyValue> renamedKeysList;
+  private List<SnapshotMoveKeyInfos> deletedDirs;
+
+  public OMSnapshotMoveTableKeysResponse(OMResponse omResponse,
+                                         @Nonnull SnapshotInfo fromSnapshot, 
SnapshotInfo nextSnapshot,
+                                         List<SnapshotMoveKeyInfos> 
deletedKeys,
+                                         List<SnapshotMoveKeyInfos> 
deletedDirs,
+                                         List<HddsProtos.KeyValue> 
renamedKeys) {
+    super(omResponse);
+    this.fromSnapshot = fromSnapshot;
+    this.nextSnapshot = nextSnapshot;
+    this.deletedKeys = deletedKeys;
+    this.renamedKeysList = renamedKeys;
+    this.deletedDirs = deletedDirs;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMSnapshotMoveTableKeysResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager, 
BatchOperation batchOperation) throws IOException {
+
+    // Note: a trick to get
+    // To do this properly, refactor OzoneManagerDoubleBuffer#addToBatch and
+    // add OmSnapshotManager as a parameter.

Review Comment:
   1. Incomplete comment.
   2. I don't know if it is a good idea to refactor  
`OzoneManagerDoubleBuffer#addToBatch`. Maybe add `getOzoneManager()` to 
`OMMetadataManager`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -463,92 +253,36 @@ private void submitSnapshotPurgeRequest(List<String> 
purgeSnapshotKeys) {
       }
     }
 
-    @SuppressWarnings("checkstyle:ParameterNumber")
-    private void splitRepeatedOmKeyInfo(SnapshotMoveKeyInfos.Builder toReclaim,
-        SnapshotMoveKeyInfos.Builder toNextDb,
-        HddsProtos.KeyValue.Builder renamedKey, OmKeyInfo keyInfo,
-        Table<String, OmKeyInfo> previousKeyTable,
-        Table<String, String> renamedTable,
-        OmBucketInfo bucketInfo, long volumeId) throws IOException {
-
-      if (isKeyReclaimable(previousKeyTable, renamedTable,
-          keyInfo, bucketInfo, volumeId, renamedKey)) {
-        // Update in current db's deletedKeyTable
-        toReclaim.addKeyInfos(keyInfo
-            .getProtobuf(ClientVersion.CURRENT_VERSION));
-      } else {
-        // Move to next non deleted snapshot's deleted table
-        toNextDb.addKeyInfos(keyInfo.getProtobuf(
-            ClientVersion.CURRENT_VERSION));
-      }
-    }
-
-    private boolean isDirReclaimable(
-        Table.KeyValue<String, OmKeyInfo> deletedDir,
-        Table<String, OmDirectoryInfo> previousDirTable,
-        Table<String, String> renamedTable,
-        List<HddsProtos.KeyValue> renamedList) throws IOException {
-
-      if (previousDirTable == null) {
-        return true;
-      }
-
-      String deletedDirDbKey = deletedDir.getKey();
-      OmKeyInfo deletedDirInfo = deletedDir.getValue();
-      String dbRenameKey = ozoneManager.getMetadataManager().getRenameKey(
-          deletedDirInfo.getVolumeName(), deletedDirInfo.getBucketName(),
-          deletedDirInfo.getObjectID());
-
-      /*
-      snapshotRenamedTable: /volumeName/bucketName/objectID ->
-          /volumeId/bucketId/parentId/dirName
-       */
-      String dbKeyBeforeRename = renamedTable.getIfExist(dbRenameKey);
-      String prevDbKey = null;
-
-      if (dbKeyBeforeRename != null) {
-        prevDbKey = dbKeyBeforeRename;
-        HddsProtos.KeyValue renamedDir = HddsProtos.KeyValue
-            .newBuilder()
-            .setKey(dbRenameKey)
-            .setValue(dbKeyBeforeRename)
-            .build();
-        renamedList.add(renamedDir);
-      } else {
-        // In OMKeyDeleteResponseWithFSO OzonePathKey is converted to
-        // OzoneDeletePathKey. Changing it back to check the previous DirTable.
-        prevDbKey = ozoneManager.getMetadataManager()
-            .getOzoneDeletePathDirKey(deletedDirDbKey);
-      }
-
-      OmDirectoryInfo prevDirectoryInfo = previousDirTable.get(prevDbKey);
-      if (prevDirectoryInfo == null) {
-        return true;
-      }
-
-      return prevDirectoryInfo.getObjectID() != deletedDirInfo.getObjectID();
-    }
-
-    public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
-        List<SnapshotMoveKeyInfos> toReclaimList,
-        List<SnapshotMoveKeyInfos> toNextDBList,
-        List<HddsProtos.KeyValue> renamedList,
-        List<String> dirsToMove) throws InterruptedException {
+    private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
+                                              List<SnapshotMoveKeyInfos> 
deletedKeys,
+                                              List<HddsProtos.KeyValue> 
renamedList,
+                                              List<SnapshotMoveKeyInfos> 
dirsToMove) {
 
-      SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder =
-          SnapshotMoveDeletedKeysRequest.newBuilder()
-              .setFromSnapshot(snapInfo.getProtobuf());
+      SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = 
SnapshotMoveTableKeysRequest.newBuilder()
+          .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId()));
 
-      SnapshotMoveDeletedKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
-          .addAllReclaimKeys(toReclaimList)
-          .addAllNextDBKeys(toNextDBList)
+      SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
+          .addAllDeletedKeys(deletedKeys)
           .addAllRenamedKeys(renamedList)
-          .addAllDeletedDirsToMove(dirsToMove)
+          .addAllDeletedDirs(dirsToMove)
           .build();
+      if (isBufferLimitCrossed(ratisByteLimit, 0, 
moveDeletedKeys.getSerializedSize())) {
+        int remaining = MIN_ERR_LIMIT_PER_TASK;
+        deletedKeys = deletedKeys.subList(0, Math.min(remaining, 
deletedKeys.size()));

Review Comment:
   QQ: Is it possible that subList of size `Math.min(remaining, 
deletedKeys.size())` also exceeds the limits?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -136,316 +134,108 @@ public BackgroundTaskResult call() throws 
InterruptedException {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
-      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
-
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          ozoneManager.getMetadataManager().getSnapshotInfoTable();
-      List<String> purgeSnapshotKeys = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
+      try {
+        int remaining = keyLimitPerTask;
+        Iterator<UUID> iterator = snapshotChainManager.iterator(true);
+        List<SnapshotInfo> snapshotsToBePurged = new ArrayList<>();
         long snapshotLimit = snapshotDeletionPerTask;
-
         while (iterator.hasNext() && snapshotLimit > 0) {
-          SnapshotInfo snapInfo = iterator.next().getValue();
-
-          // Only Iterate in deleted snapshot
+          SnapshotInfo snapInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
snapshotChainManager, iterator.next());
+          // Only Iterate in deleted snapshot & only if all the changes have 
been flushed into disk.
           if (shouldIgnoreSnapshot(snapInfo)) {
             continue;
           }
-
-          // Note: Can refactor this to use try-with-resources.
-          // Handling RC decrements manually for now to minimize conflicts.
-          rcOmSnapshot = omSnapshotManager.getSnapshot(
-              snapInfo.getVolumeName(),
-              snapInfo.getBucketName(),
-              snapInfo.getName());
-          OmSnapshot omSnapshot = rcOmSnapshot.get();
-
-          Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
-              omSnapshot.getMetadataManager().getDeletedTable();
-          Table<String, OmKeyInfo> snapshotDeletedDirTable =
-              omSnapshot.getMetadataManager().getDeletedDirTable();
-
-          Table<String, String> renamedTable =
-              omSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
-          long volumeId = ozoneManager.getMetadataManager()
-              .getVolumeId(snapInfo.getVolumeName());
-          // Get bucketInfo for the snapshot bucket to get bucket layout.
-          String dbBucketKey = ozoneManager.getMetadataManager().getBucketKey(
-              snapInfo.getVolumeName(), snapInfo.getBucketName());
-          OmBucketInfo bucketInfo = ozoneManager.getMetadataManager()
-              .getBucketTable().get(dbBucketKey);
-
-          if (bucketInfo == null) {
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
-            throw new IllegalStateException("Bucket " + "/" +
-                snapInfo.getVolumeName() + "/" + snapInfo.getBucketName() +
-                " is not found. BucketInfo should not be null for snapshotted" 
+
-                " bucket. The OM is in unexpected state.");
-          }
-
-          String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-          String dbBucketKeyForDir = ozoneManager.getMetadataManager()
-              .getBucketKey(Long.toString(volumeId),
-                  Long.toString(bucketInfo.getObjectID())) + OM_KEY_PREFIX;
-
-          if (isSnapshotReclaimable(snapshotDeletedTable,
-              snapshotDeletedDirTable, snapshotBucketKey, dbBucketKeyForDir)) {
-            purgeSnapshotKeys.add(snapInfo.getTableKey());
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
+          SnapshotInfo nextSnapshot = 
SnapshotUtils.getNextSnapshot(ozoneManager, snapshotChainManager, snapInfo);
+          // Continue if the next snapshot is not active. This is to avoid 
unnecessary copies from one snapshot to
+          // another.
+          if (nextSnapshot != null &&
+              nextSnapshot.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
             continue;
           }
 
-          //TODO: [SNAPSHOT] Add lock to deletedTable and Active DB.
-          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(snapInfo, 
chainManager);
-          Table<String, OmKeyInfo> previousKeyTable = null;
-          Table<String, OmDirectoryInfo> previousDirTable = null;
-          OmSnapshot omPreviousSnapshot = null;
-
-          // Split RepeatedOmKeyInfo and update current snapshot 
deletedKeyTable
-          // and next snapshot deletedKeyTable.
-          if (previousSnapshot != null) {
-            rcOmPreviousSnapshot = omSnapshotManager.getSnapshot(
-                previousSnapshot.getVolumeName(),
-                previousSnapshot.getBucketName(),
-                previousSnapshot.getName());
-            omPreviousSnapshot = rcOmPreviousSnapshot.get();
-
-            previousKeyTable = omPreviousSnapshot
-                
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
-            previousDirTable = omPreviousSnapshot
-                .getMetadataManager().getDirectoryTable();
-          }
-
-          // Move key to either next non deleted snapshot's deletedTable
-          // or keep it in current snapshot deleted table.
-          List<SnapshotMoveKeyInfos> toReclaimList = new ArrayList<>();
-          List<SnapshotMoveKeyInfos> toNextDBList = new ArrayList<>();
-          // A list of renamed keys/files/dirs
-          List<HddsProtos.KeyValue> renamedList = new ArrayList<>();
-          List<String> dirsToMove = new ArrayList<>();
-
-          long remainNum = handleDirectoryCleanUp(snapshotDeletedDirTable,
-              previousDirTable, renamedTable, dbBucketKeyForDir, snapInfo,
-              omSnapshot, dirsToMove, renamedList);
-          int deletionCount = 0;
-
-          try (TableIterator<String, ? extends Table.KeyValue<String,
-              RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable
-              .iterator()) {
-
-            List<BlockGroup> keysToPurge = new ArrayList<>();
-            deletedIterator.seek(snapshotBucketKey);
-
-            while (deletedIterator.hasNext() &&
-                deletionCount < remainNum) {
-              Table.KeyValue<String, RepeatedOmKeyInfo>
-                  deletedKeyValue = deletedIterator.next();
-              String deletedKey = deletedKeyValue.getKey();
-
-              // Exit if it is out of the bucket scope.
-              if (!deletedKey.startsWith(snapshotBucketKey)) {
-                // If snapshot deletedKeyTable doesn't have any
-                // entry in the snapshot scope it can be reclaimed
-                break;
-              }
-
-              RepeatedOmKeyInfo repeatedOmKeyInfo = deletedKeyValue.getValue();
-
-              SnapshotMoveKeyInfos.Builder toReclaim = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              SnapshotMoveKeyInfos.Builder toNextDb = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              HddsProtos.KeyValue.Builder renamedKey = HddsProtos.KeyValue
-                  .newBuilder();
-
-              for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
-                splitRepeatedOmKeyInfo(toReclaim, toNextDb, renamedKey,
-                    keyInfo, previousKeyTable, renamedTable,
-                    bucketInfo, volumeId);
+          // nextSnapshot = null means entries would be moved to AOS, hence 
ensure that KeyDeletingService &
+          // DirectoryDeletingService is not running while the entries are 
moving.

Review Comment:
   nit: move this to 'waitForKdsAndDds` with proper a Javadoc comment.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -136,316 +134,108 @@ public BackgroundTaskResult call() throws 
InterruptedException {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
-      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
-
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          ozoneManager.getMetadataManager().getSnapshotInfoTable();
-      List<String> purgeSnapshotKeys = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
+      try {
+        int remaining = keyLimitPerTask;
+        Iterator<UUID> iterator = snapshotChainManager.iterator(true);
+        List<SnapshotInfo> snapshotsToBePurged = new ArrayList<>();
         long snapshotLimit = snapshotDeletionPerTask;
-
         while (iterator.hasNext() && snapshotLimit > 0) {
-          SnapshotInfo snapInfo = iterator.next().getValue();
-
-          // Only Iterate in deleted snapshot
+          SnapshotInfo snapInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
snapshotChainManager, iterator.next());
+          // Only Iterate in deleted snapshot & only if all the changes have 
been flushed into disk.

Review Comment:
   This should be a comment (maybe a java doc comment) of 
`shouldIgnoreSnapshot()`. 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.createMergedRepeatedOmKeyInfoFromDeletedTableEntry;
+
+/**
+ * Response for OMSnapshotMoveDeletedKeysRequest.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotMoveTableKeysResponse extends OMClientResponse {
+
+  private SnapshotInfo fromSnapshot;
+  private SnapshotInfo nextSnapshot;
+  private List<SnapshotMoveKeyInfos> deletedKeys;
+  private List<HddsProtos.KeyValue> renamedKeysList;
+  private List<SnapshotMoveKeyInfos> deletedDirs;
+
+  public OMSnapshotMoveTableKeysResponse(OMResponse omResponse,
+                                         @Nonnull SnapshotInfo fromSnapshot, 
SnapshotInfo nextSnapshot,
+                                         List<SnapshotMoveKeyInfos> 
deletedKeys,
+                                         List<SnapshotMoveKeyInfos> 
deletedDirs,
+                                         List<HddsProtos.KeyValue> 
renamedKeys) {
+    super(omResponse);
+    this.fromSnapshot = fromSnapshot;
+    this.nextSnapshot = nextSnapshot;
+    this.deletedKeys = deletedKeys;
+    this.renamedKeysList = renamedKeys;
+    this.deletedDirs = deletedDirs;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMSnapshotMoveTableKeysResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager, 
BatchOperation batchOperation) throws IOException {
+
+    // Note: a trick to get
+    // To do this properly, refactor OzoneManagerDoubleBuffer#addToBatch and
+    // add OmSnapshotManager as a parameter.
+    OmSnapshotManager omSnapshotManager = ((OmMetadataManagerImpl) 
omMetadataManager)
+        .getOzoneManager().getOmSnapshotManager();
+
+    try (ReferenceCounted<OmSnapshot> rcOmFromSnapshot =
+             omSnapshotManager.getSnapshot(fromSnapshot.getSnapshotId())) {
+
+      OmSnapshot fromOmSnapshot = rcOmFromSnapshot.get();
+
+      if (nextSnapshot != null) {
+        try (ReferenceCounted<OmSnapshot>
+            rcOmNextSnapshot = 
omSnapshotManager.getSnapshot(nextSnapshot.getSnapshotId())) {
+
+          OmSnapshot nextOmSnapshot = rcOmNextSnapshot.get();
+          RDBStore nextSnapshotStore = (RDBStore) 
nextOmSnapshot.getMetadataManager().getStore();
+          // Init Batch Operation for snapshot db.
+          try (BatchOperation writeBatch = 
nextSnapshotStore.initBatchOperation()) {
+            addKeysToNextSnapshot(writeBatch, 
nextOmSnapshot.getMetadataManager());
+            nextSnapshotStore.commitBatchOperation(writeBatch);
+            nextSnapshotStore.getDb().flushWal(true);
+            nextSnapshotStore.getDb().flush();
+          }
+        }
+      } else {
+        // Handle the case where there is no next Snapshot.
+        addKeysToNextSnapshot(batchOperation, omMetadataManager);
+      }
+
+      // Update From Snapshot Deleted Table.
+      RDBStore fromSnapshotStore = (RDBStore) 
fromOmSnapshot.getMetadataManager().getStore();
+      try (BatchOperation fromSnapshotBatchOp = 
fromSnapshotStore.initBatchOperation()) {
+        deleteKeysFromSnapshot(fromSnapshotBatchOp, 
fromOmSnapshot.getMetadataManager());
+        fromSnapshotStore.commitBatchOperation(fromSnapshotBatchOp);
+        fromSnapshotStore.getDb().flushWal(true);
+        fromSnapshotStore.getDb().flush();
+      }
+    }
+
+    // Flush snapshot info to rocksDB.
+    omMetadataManager.getSnapshotInfoTable().putWithBatch(batchOperation, 
fromSnapshot.getTableKey(), fromSnapshot);
+    if (nextSnapshot != null) {
+      omMetadataManager.getSnapshotInfoTable().putWithBatch(batchOperation, 
nextSnapshot.getTableKey(), nextSnapshot);
+    }
+  }
+
+  private void deleteKeysFromSnapshot(BatchOperation batchOp, 
OMMetadataManager fromSnapshotMetadataManager)
+      throws IOException {
+    for (SnapshotMoveKeyInfos deletedKey : deletedKeys) {
+      // Delete keys from current snapshot that are moved to next snapshot.
+      fromSnapshotMetadataManager.getDeletedTable().deleteWithBatch(batchOp, 
deletedKey.getKey());
+    }
+
+    // Delete renamed keys from current snapshot that are moved to next 
snapshot.
+    for (HddsProtos.KeyValue renamedKey: renamedKeysList) {
+      
fromSnapshotMetadataManager.getSnapshotRenamedTable().deleteWithBatch(batchOp, 
renamedKey.getKey());
+    }
+
+    // Delete deletedDir from current snapshot that are moved to next snapshot.
+    for (SnapshotMoveKeyInfos deletedDir : deletedDirs) {
+      
fromSnapshotMetadataManager.getDeletedDirTable().deleteWithBatch(batchOp, 
deletedDir.getKey());
+    }
+
+  }
+
+  private void addKeysToNextSnapshot(BatchOperation batchOp, OMMetadataManager 
metadataManager) throws IOException {
+
+    // Add renamed keys to the next snapshot or active DB.
+    for (HddsProtos.KeyValue renamedKey: renamedKeysList) {
+      metadataManager.getSnapshotRenamedTable().putWithBatch(batchOp, 
renamedKey.getKey(), renamedKey.getValue());
+    }
+    // Add deleted keys to the next snapshot or active DB.
+    for (SnapshotMoveKeyInfos dBKey : deletedKeys) {

Review Comment:
   nit: same as previous, maybe `keyInfo` not just `dbKey`. Applicable to the 
next loop as well.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.createMergedRepeatedOmKeyInfoFromDeletedTableEntry;
+
+/**
+ * Response for OMSnapshotMoveDeletedKeysRequest.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotMoveTableKeysResponse extends OMClientResponse {
+
+  private SnapshotInfo fromSnapshot;
+  private SnapshotInfo nextSnapshot;
+  private List<SnapshotMoveKeyInfos> deletedKeys;
+  private List<HddsProtos.KeyValue> renamedKeysList;
+  private List<SnapshotMoveKeyInfos> deletedDirs;
+
+  public OMSnapshotMoveTableKeysResponse(OMResponse omResponse,
+                                         @Nonnull SnapshotInfo fromSnapshot, 
SnapshotInfo nextSnapshot,
+                                         List<SnapshotMoveKeyInfos> 
deletedKeys,
+                                         List<SnapshotMoveKeyInfos> 
deletedDirs,
+                                         List<HddsProtos.KeyValue> 
renamedKeys) {
+    super(omResponse);
+    this.fromSnapshot = fromSnapshot;
+    this.nextSnapshot = nextSnapshot;
+    this.deletedKeys = deletedKeys;
+    this.renamedKeysList = renamedKeys;
+    this.deletedDirs = deletedDirs;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMSnapshotMoveTableKeysResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager, 
BatchOperation batchOperation) throws IOException {
+
+    // Note: a trick to get
+    // To do this properly, refactor OzoneManagerDoubleBuffer#addToBatch and
+    // add OmSnapshotManager as a parameter.
+    OmSnapshotManager omSnapshotManager = ((OmMetadataManagerImpl) 
omMetadataManager)
+        .getOzoneManager().getOmSnapshotManager();
+
+    try (ReferenceCounted<OmSnapshot> rcOmFromSnapshot =
+             omSnapshotManager.getSnapshot(fromSnapshot.getSnapshotId())) {
+
+      OmSnapshot fromOmSnapshot = rcOmFromSnapshot.get();
+
+      if (nextSnapshot != null) {
+        try (ReferenceCounted<OmSnapshot>
+            rcOmNextSnapshot = 
omSnapshotManager.getSnapshot(nextSnapshot.getSnapshotId())) {
+
+          OmSnapshot nextOmSnapshot = rcOmNextSnapshot.get();
+          RDBStore nextSnapshotStore = (RDBStore) 
nextOmSnapshot.getMetadataManager().getStore();
+          // Init Batch Operation for snapshot db.
+          try (BatchOperation writeBatch = 
nextSnapshotStore.initBatchOperation()) {
+            addKeysToNextSnapshot(writeBatch, 
nextOmSnapshot.getMetadataManager());
+            nextSnapshotStore.commitBatchOperation(writeBatch);
+            nextSnapshotStore.getDb().flushWal(true);
+            nextSnapshotStore.getDb().flush();
+          }
+        }
+      } else {
+        // Handle the case where there is no next Snapshot.
+        addKeysToNextSnapshot(batchOperation, omMetadataManager);
+      }
+
+      // Update From Snapshot Deleted Table.
+      RDBStore fromSnapshotStore = (RDBStore) 
fromOmSnapshot.getMetadataManager().getStore();
+      try (BatchOperation fromSnapshotBatchOp = 
fromSnapshotStore.initBatchOperation()) {
+        deleteKeysFromSnapshot(fromSnapshotBatchOp, 
fromOmSnapshot.getMetadataManager());
+        fromSnapshotStore.commitBatchOperation(fromSnapshotBatchOp);
+        fromSnapshotStore.getDb().flushWal(true);
+        fromSnapshotStore.getDb().flush();
+      }
+    }
+
+    // Flush snapshot info to rocksDB.
+    omMetadataManager.getSnapshotInfoTable().putWithBatch(batchOperation, 
fromSnapshot.getTableKey(), fromSnapshot);
+    if (nextSnapshot != null) {
+      omMetadataManager.getSnapshotInfoTable().putWithBatch(batchOperation, 
nextSnapshot.getTableKey(), nextSnapshot);
+    }
+  }
+
+  private void deleteKeysFromSnapshot(BatchOperation batchOp, 
OMMetadataManager fromSnapshotMetadataManager)
+      throws IOException {
+    for (SnapshotMoveKeyInfos deletedKey : deletedKeys) {
+      // Delete keys from current snapshot that are moved to next snapshot.
+      fromSnapshotMetadataManager.getDeletedTable().deleteWithBatch(batchOp, 
deletedKey.getKey());
+    }
+
+    // Delete renamed keys from current snapshot that are moved to next 
snapshot.
+    for (HddsProtos.KeyValue renamedKey: renamedKeysList) {
+      
fromSnapshotMetadataManager.getSnapshotRenamedTable().deleteWithBatch(batchOp, 
renamedKey.getKey());
+    }
+
+    // Delete deletedDir from current snapshot that are moved to next snapshot.
+    for (SnapshotMoveKeyInfos deletedDir : deletedDirs) {
+      
fromSnapshotMetadataManager.getDeletedDirTable().deleteWithBatch(batchOp, 
deletedDir.getKey());
+    }
+
+  }
+
+  private void addKeysToNextSnapshot(BatchOperation batchOp, OMMetadataManager 
metadataManager) throws IOException {
+
+    // Add renamed keys to the next snapshot or active DB.
+    for (HddsProtos.KeyValue renamedKey: renamedKeysList) {

Review Comment:
   nit: `renameKeyValue`. `renameKey` gives just a key impression.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -557,20 +291,20 @@ public void submitSnapshotMoveDeletedKeys(SnapshotInfo 
snapInfo,
       }
     }
 
-    public void submitRequest(OMRequest omRequest) {
+    private void submitRequest(OMRequest omRequest) {
       try {
         OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, 
clientId, getRunCount().get());
       } catch (ServiceException e) {
-        LOG.error("Snapshot Deleting request failed. " +
-            "Will retry at next run.", e);
+        LOG.error("Snapshot Deleting request failed. Will retry at next run.", 
e);

Review Comment:
   nit: this log message needs to change now.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -463,92 +253,36 @@ private void submitSnapshotPurgeRequest(List<String> 
purgeSnapshotKeys) {
       }
     }
 
-    @SuppressWarnings("checkstyle:ParameterNumber")
-    private void splitRepeatedOmKeyInfo(SnapshotMoveKeyInfos.Builder toReclaim,
-        SnapshotMoveKeyInfos.Builder toNextDb,
-        HddsProtos.KeyValue.Builder renamedKey, OmKeyInfo keyInfo,
-        Table<String, OmKeyInfo> previousKeyTable,
-        Table<String, String> renamedTable,
-        OmBucketInfo bucketInfo, long volumeId) throws IOException {
-
-      if (isKeyReclaimable(previousKeyTable, renamedTable,
-          keyInfo, bucketInfo, volumeId, renamedKey)) {
-        // Update in current db's deletedKeyTable
-        toReclaim.addKeyInfos(keyInfo
-            .getProtobuf(ClientVersion.CURRENT_VERSION));
-      } else {
-        // Move to next non deleted snapshot's deleted table
-        toNextDb.addKeyInfos(keyInfo.getProtobuf(
-            ClientVersion.CURRENT_VERSION));
-      }
-    }
-
-    private boolean isDirReclaimable(
-        Table.KeyValue<String, OmKeyInfo> deletedDir,
-        Table<String, OmDirectoryInfo> previousDirTable,
-        Table<String, String> renamedTable,
-        List<HddsProtos.KeyValue> renamedList) throws IOException {
-
-      if (previousDirTable == null) {
-        return true;
-      }
-
-      String deletedDirDbKey = deletedDir.getKey();
-      OmKeyInfo deletedDirInfo = deletedDir.getValue();
-      String dbRenameKey = ozoneManager.getMetadataManager().getRenameKey(
-          deletedDirInfo.getVolumeName(), deletedDirInfo.getBucketName(),
-          deletedDirInfo.getObjectID());
-
-      /*
-      snapshotRenamedTable: /volumeName/bucketName/objectID ->
-          /volumeId/bucketId/parentId/dirName
-       */
-      String dbKeyBeforeRename = renamedTable.getIfExist(dbRenameKey);
-      String prevDbKey = null;
-
-      if (dbKeyBeforeRename != null) {
-        prevDbKey = dbKeyBeforeRename;
-        HddsProtos.KeyValue renamedDir = HddsProtos.KeyValue
-            .newBuilder()
-            .setKey(dbRenameKey)
-            .setValue(dbKeyBeforeRename)
-            .build();
-        renamedList.add(renamedDir);
-      } else {
-        // In OMKeyDeleteResponseWithFSO OzonePathKey is converted to
-        // OzoneDeletePathKey. Changing it back to check the previous DirTable.
-        prevDbKey = ozoneManager.getMetadataManager()
-            .getOzoneDeletePathDirKey(deletedDirDbKey);
-      }
-
-      OmDirectoryInfo prevDirectoryInfo = previousDirTable.get(prevDbKey);
-      if (prevDirectoryInfo == null) {
-        return true;
-      }
-
-      return prevDirectoryInfo.getObjectID() != deletedDirInfo.getObjectID();
-    }
-
-    public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
-        List<SnapshotMoveKeyInfos> toReclaimList,
-        List<SnapshotMoveKeyInfos> toNextDBList,
-        List<HddsProtos.KeyValue> renamedList,
-        List<String> dirsToMove) throws InterruptedException {
+    private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
+                                              List<SnapshotMoveKeyInfos> 
deletedKeys,
+                                              List<HddsProtos.KeyValue> 
renamedList,
+                                              List<SnapshotMoveKeyInfos> 
dirsToMove) {

Review Comment:
   nit: alignment is off.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -136,316 +134,108 @@ public BackgroundTaskResult call() throws 
InterruptedException {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
-      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
-
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          ozoneManager.getMetadataManager().getSnapshotInfoTable();
-      List<String> purgeSnapshotKeys = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
+      try {
+        int remaining = keyLimitPerTask;
+        Iterator<UUID> iterator = snapshotChainManager.iterator(true);
+        List<SnapshotInfo> snapshotsToBePurged = new ArrayList<>();
         long snapshotLimit = snapshotDeletionPerTask;
-
         while (iterator.hasNext() && snapshotLimit > 0) {
-          SnapshotInfo snapInfo = iterator.next().getValue();
-
-          // Only Iterate in deleted snapshot
+          SnapshotInfo snapInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
snapshotChainManager, iterator.next());
+          // Only Iterate in deleted snapshot & only if all the changes have 
been flushed into disk.
           if (shouldIgnoreSnapshot(snapInfo)) {
             continue;
           }
-
-          // Note: Can refactor this to use try-with-resources.
-          // Handling RC decrements manually for now to minimize conflicts.
-          rcOmSnapshot = omSnapshotManager.getSnapshot(
-              snapInfo.getVolumeName(),
-              snapInfo.getBucketName(),
-              snapInfo.getName());
-          OmSnapshot omSnapshot = rcOmSnapshot.get();
-
-          Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
-              omSnapshot.getMetadataManager().getDeletedTable();
-          Table<String, OmKeyInfo> snapshotDeletedDirTable =
-              omSnapshot.getMetadataManager().getDeletedDirTable();
-
-          Table<String, String> renamedTable =
-              omSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
-          long volumeId = ozoneManager.getMetadataManager()
-              .getVolumeId(snapInfo.getVolumeName());
-          // Get bucketInfo for the snapshot bucket to get bucket layout.
-          String dbBucketKey = ozoneManager.getMetadataManager().getBucketKey(
-              snapInfo.getVolumeName(), snapInfo.getBucketName());
-          OmBucketInfo bucketInfo = ozoneManager.getMetadataManager()
-              .getBucketTable().get(dbBucketKey);
-
-          if (bucketInfo == null) {
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
-            throw new IllegalStateException("Bucket " + "/" +
-                snapInfo.getVolumeName() + "/" + snapInfo.getBucketName() +
-                " is not found. BucketInfo should not be null for snapshotted" 
+
-                " bucket. The OM is in unexpected state.");
-          }
-
-          String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-          String dbBucketKeyForDir = ozoneManager.getMetadataManager()
-              .getBucketKey(Long.toString(volumeId),
-                  Long.toString(bucketInfo.getObjectID())) + OM_KEY_PREFIX;
-
-          if (isSnapshotReclaimable(snapshotDeletedTable,
-              snapshotDeletedDirTable, snapshotBucketKey, dbBucketKeyForDir)) {
-            purgeSnapshotKeys.add(snapInfo.getTableKey());
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
+          SnapshotInfo nextSnapshot = 
SnapshotUtils.getNextSnapshot(ozoneManager, snapshotChainManager, snapInfo);
+          // Continue if the next snapshot is not active. This is to avoid 
unnecessary copies from one snapshot to
+          // another.
+          if (nextSnapshot != null &&
+              nextSnapshot.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
             continue;
           }
 
-          //TODO: [SNAPSHOT] Add lock to deletedTable and Active DB.
-          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(snapInfo, 
chainManager);
-          Table<String, OmKeyInfo> previousKeyTable = null;
-          Table<String, OmDirectoryInfo> previousDirTable = null;
-          OmSnapshot omPreviousSnapshot = null;
-
-          // Split RepeatedOmKeyInfo and update current snapshot 
deletedKeyTable
-          // and next snapshot deletedKeyTable.
-          if (previousSnapshot != null) {
-            rcOmPreviousSnapshot = omSnapshotManager.getSnapshot(
-                previousSnapshot.getVolumeName(),
-                previousSnapshot.getBucketName(),
-                previousSnapshot.getName());
-            omPreviousSnapshot = rcOmPreviousSnapshot.get();
-
-            previousKeyTable = omPreviousSnapshot
-                
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
-            previousDirTable = omPreviousSnapshot
-                .getMetadataManager().getDirectoryTable();
-          }
-
-          // Move key to either next non deleted snapshot's deletedTable
-          // or keep it in current snapshot deleted table.
-          List<SnapshotMoveKeyInfos> toReclaimList = new ArrayList<>();
-          List<SnapshotMoveKeyInfos> toNextDBList = new ArrayList<>();
-          // A list of renamed keys/files/dirs
-          List<HddsProtos.KeyValue> renamedList = new ArrayList<>();
-          List<String> dirsToMove = new ArrayList<>();
-
-          long remainNum = handleDirectoryCleanUp(snapshotDeletedDirTable,
-              previousDirTable, renamedTable, dbBucketKeyForDir, snapInfo,
-              omSnapshot, dirsToMove, renamedList);
-          int deletionCount = 0;
-
-          try (TableIterator<String, ? extends Table.KeyValue<String,
-              RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable
-              .iterator()) {
-
-            List<BlockGroup> keysToPurge = new ArrayList<>();
-            deletedIterator.seek(snapshotBucketKey);
-
-            while (deletedIterator.hasNext() &&
-                deletionCount < remainNum) {
-              Table.KeyValue<String, RepeatedOmKeyInfo>
-                  deletedKeyValue = deletedIterator.next();
-              String deletedKey = deletedKeyValue.getKey();
-
-              // Exit if it is out of the bucket scope.
-              if (!deletedKey.startsWith(snapshotBucketKey)) {
-                // If snapshot deletedKeyTable doesn't have any
-                // entry in the snapshot scope it can be reclaimed
-                break;
-              }
-
-              RepeatedOmKeyInfo repeatedOmKeyInfo = deletedKeyValue.getValue();
-
-              SnapshotMoveKeyInfos.Builder toReclaim = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              SnapshotMoveKeyInfos.Builder toNextDb = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              HddsProtos.KeyValue.Builder renamedKey = HddsProtos.KeyValue
-                  .newBuilder();
-
-              for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
-                splitRepeatedOmKeyInfo(toReclaim, toNextDb, renamedKey,
-                    keyInfo, previousKeyTable, renamedTable,
-                    bucketInfo, volumeId);
+          // nextSnapshot = null means entries would be moved to AOS, hence 
ensure that KeyDeletingService &
+          // DirectoryDeletingService is not running while the entries are 
moving.
+          if (nextSnapshot == null) {
+            KeyDeletingService keyDeletingService = 
getOzoneManager().getKeyManager().getDeletingService();
+            synchronized (keyDeletingService) {
+              while (keyDeletingService != null && 
keyDeletingService.isRunningOnAOS()) {
+                keyDeletingService.wait(serviceTimeout);
               }
+            }
 
-              // If all the KeyInfos are reclaimable in RepeatedOmKeyInfo
-              // then no need to update current snapshot deletedKeyTable.
-              if (!(toReclaim.getKeyInfosCount() ==
-                  repeatedOmKeyInfo.getOmKeyInfoList().size())) {
-                toReclaimList.add(toReclaim.build());
-                toNextDBList.add(toNextDb.build());
-              } else {
-                // The key can be reclaimed here.
-                List<BlockGroup> blocksForKeyDelete = omSnapshot
-                    .getMetadataManager()
-                    .getBlocksForKeyDelete(deletedKey);
-                if (blocksForKeyDelete != null) {
-                  keysToPurge.addAll(blocksForKeyDelete);
-                }
+            DirectoryDeletingService directoryDeletingService = 
getOzoneManager().getKeyManager()
+                .getDirDeletingService();
+            synchronized (directoryDeletingService) {
+              while (directoryDeletingService != null && 
directoryDeletingService.isRunningOnAOS()) {
+                directoryDeletingService.wait(serviceTimeout);
               }
-
-              if (renamedKey.hasKey() && renamedKey.hasValue()) {
-                renamedList.add(renamedKey.build());
+            }
+          }
+          try (ReferenceCounted<OmSnapshot> snapshot = 
omSnapshotManager.getSnapshot(
+              snapInfo.getVolumeName(), snapInfo.getBucketName(), 
snapInfo.getName())) {
+            KeyManager snapshotKeyManager = snapshot.get().getKeyManager();
+            int moveCount = 0;
+            // Get all entries from deletedKeyTable.
+            List<Table.KeyValue<String, List<OmKeyInfo>>> deletedKeyEntries =
+                
snapshotKeyManager.getDeletedKeyEntries(snapInfo.getVolumeName(), 
snapInfo.getBucketName(),
+                    null, remaining);
+            moveCount += deletedKeyEntries.size();
+            // Get all entries from deletedDirTable.
+            List<Table.KeyValue<String, OmKeyInfo>> deletedDirEntries = 
snapshotKeyManager.getDeletedDirEntries(
+                snapInfo.getVolumeName(), snapInfo.getBucketName(), remaining 
- moveCount);
+            moveCount += deletedDirEntries.size();
+            // Get all entries from snapshotRenamedTable.
+            List<Table.KeyValue<String, String>> renameEntries = 
snapshotKeyManager.getRenamesKeyEntries(
+                snapInfo.getVolumeName(), snapInfo.getBucketName(), null, 
remaining - moveCount);
+            moveCount += renameEntries.size();
+            if (moveCount > 0) {
+              try {
+                submitSnapshotMoveDeletedKeys(snapInfo, 
deletedKeyEntries.stream().map(kv -> {

Review Comment:
   Even though, I like Java stream because it is useful for lazy evaluation and 
makes code concise and readable. But here it is not serving any of the above 
purposes. The collection is not going through multiple transformations. Second, 
catching and throwing `IOException` is just not needed. Also, there is 
unnecessary overhead of converting a collection to a stream.
   
   I prefer a simple for loop here and a transformation function if needed.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java:
##########
@@ -242,4 +242,43 @@ public static String 
getOzonePathKeyForFso(OMMetadataManager metadataManager,
     final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
     return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX;
   }
+
+  /**
+   * Creates merged repeatedKeyInfo entry with the existing deleted entry in 
the table.

Review Comment:
   ```suggestion 
     * Returns merged repeatedKeyInfo entry with the existing deleted entry in 
the table.
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java:
##########
@@ -242,4 +242,43 @@ public static String 
getOzonePathKeyForFso(OMMetadataManager metadataManager,
     final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
     return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX;
   }
+
+  /**
+   * Creates merged repeatedKeyInfo entry with the existing deleted entry in 
the table.
+   * @param snapshotMoveKeyInfos keyInfos to be added.
+   * @param metadataManager metadataManager for a store.
+   * @return
+   * @throws IOException
+   */
+  public static RepeatedOmKeyInfo 
createMergedRepeatedOmKeyInfoFromDeletedTableEntry(
+      OzoneManagerProtocolProtos.SnapshotMoveKeyInfos snapshotMoveKeyInfos, 
OMMetadataManager metadataManager) throws
+      IOException {
+    String dbKey = snapshotMoveKeyInfos.getKey();
+    List<OmKeyInfo> keyInfoList = new ArrayList<>();
+    for (OzoneManagerProtocolProtos.KeyInfo info : 
snapshotMoveKeyInfos.getKeyInfosList()) {
+      OmKeyInfo fromProtobuf = OmKeyInfo.getFromProtobuf(info);
+      keyInfoList.add(fromProtobuf);
+    }
+    // When older version of keys are moved to the next snapshot's deletedTable
+    // The newer version might also be in the next snapshot's deletedTable and
+    // it might overwrite. This is to avoid that and also avoid having
+    // orphans blocks. Checking the last keyInfoList size omKeyInfo versions,
+    // this is to avoid redundant additions if the last n versions match.
+    RepeatedOmKeyInfo result = metadataManager.getDeletedTable().get(dbKey);
+    if (result == null) {
+      result = new RepeatedOmKeyInfo(keyInfoList);
+    } else if (!isSameAsLatestOmKeyInfo(keyInfoList, result)) {
+      keyInfoList.forEach(result::addOmKeyInfo);
+    }
+    return result;
+  }
+
+  private static boolean isSameAsLatestOmKeyInfo(List<OmKeyInfo> omKeyInfos,
+                                                RepeatedOmKeyInfo result) {

Review Comment:
   nit: alignment is off
   ```suggestion
     private static boolean isSameAsLatestOmKeyInfo(List<OmKeyInfo> omKeyInfos,
                                                    RepeatedOmKeyInfo result) {
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -136,316 +134,108 @@ public BackgroundTaskResult call() throws 
InterruptedException {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
-      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
-
-      Table<String, SnapshotInfo> snapshotInfoTable =
-          ozoneManager.getMetadataManager().getSnapshotInfoTable();
-      List<String> purgeSnapshotKeys = new ArrayList<>();
-      try (TableIterator<String, ? extends Table.KeyValue
-          <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
+      try {
+        int remaining = keyLimitPerTask;
+        Iterator<UUID> iterator = snapshotChainManager.iterator(true);
+        List<SnapshotInfo> snapshotsToBePurged = new ArrayList<>();
         long snapshotLimit = snapshotDeletionPerTask;
-
         while (iterator.hasNext() && snapshotLimit > 0) {
-          SnapshotInfo snapInfo = iterator.next().getValue();
-
-          // Only Iterate in deleted snapshot
+          SnapshotInfo snapInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, 
snapshotChainManager, iterator.next());
+          // Only Iterate in deleted snapshot & only if all the changes have 
been flushed into disk.
           if (shouldIgnoreSnapshot(snapInfo)) {
             continue;
           }
-
-          // Note: Can refactor this to use try-with-resources.
-          // Handling RC decrements manually for now to minimize conflicts.
-          rcOmSnapshot = omSnapshotManager.getSnapshot(
-              snapInfo.getVolumeName(),
-              snapInfo.getBucketName(),
-              snapInfo.getName());
-          OmSnapshot omSnapshot = rcOmSnapshot.get();
-
-          Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
-              omSnapshot.getMetadataManager().getDeletedTable();
-          Table<String, OmKeyInfo> snapshotDeletedDirTable =
-              omSnapshot.getMetadataManager().getDeletedDirTable();
-
-          Table<String, String> renamedTable =
-              omSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
-          long volumeId = ozoneManager.getMetadataManager()
-              .getVolumeId(snapInfo.getVolumeName());
-          // Get bucketInfo for the snapshot bucket to get bucket layout.
-          String dbBucketKey = ozoneManager.getMetadataManager().getBucketKey(
-              snapInfo.getVolumeName(), snapInfo.getBucketName());
-          OmBucketInfo bucketInfo = ozoneManager.getMetadataManager()
-              .getBucketTable().get(dbBucketKey);
-
-          if (bucketInfo == null) {
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
-            throw new IllegalStateException("Bucket " + "/" +
-                snapInfo.getVolumeName() + "/" + snapInfo.getBucketName() +
-                " is not found. BucketInfo should not be null for snapshotted" 
+
-                " bucket. The OM is in unexpected state.");
-          }
-
-          String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
-          String dbBucketKeyForDir = ozoneManager.getMetadataManager()
-              .getBucketKey(Long.toString(volumeId),
-                  Long.toString(bucketInfo.getObjectID())) + OM_KEY_PREFIX;
-
-          if (isSnapshotReclaimable(snapshotDeletedTable,
-              snapshotDeletedDirTable, snapshotBucketKey, dbBucketKeyForDir)) {
-            purgeSnapshotKeys.add(snapInfo.getTableKey());
-            // Decrement ref count
-            rcOmSnapshot.close();
-            rcOmSnapshot = null;
+          SnapshotInfo nextSnapshot = 
SnapshotUtils.getNextSnapshot(ozoneManager, snapshotChainManager, snapInfo);
+          // Continue if the next snapshot is not active. This is to avoid 
unnecessary copies from one snapshot to
+          // another.
+          if (nextSnapshot != null &&
+              nextSnapshot.getSnapshotStatus() != 
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
             continue;
           }
 
-          //TODO: [SNAPSHOT] Add lock to deletedTable and Active DB.
-          SnapshotInfo previousSnapshot = getPreviousActiveSnapshot(snapInfo, 
chainManager);
-          Table<String, OmKeyInfo> previousKeyTable = null;
-          Table<String, OmDirectoryInfo> previousDirTable = null;
-          OmSnapshot omPreviousSnapshot = null;
-
-          // Split RepeatedOmKeyInfo and update current snapshot 
deletedKeyTable
-          // and next snapshot deletedKeyTable.
-          if (previousSnapshot != null) {
-            rcOmPreviousSnapshot = omSnapshotManager.getSnapshot(
-                previousSnapshot.getVolumeName(),
-                previousSnapshot.getBucketName(),
-                previousSnapshot.getName());
-            omPreviousSnapshot = rcOmPreviousSnapshot.get();
-
-            previousKeyTable = omPreviousSnapshot
-                
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
-            previousDirTable = omPreviousSnapshot
-                .getMetadataManager().getDirectoryTable();
-          }
-
-          // Move key to either next non deleted snapshot's deletedTable
-          // or keep it in current snapshot deleted table.
-          List<SnapshotMoveKeyInfos> toReclaimList = new ArrayList<>();
-          List<SnapshotMoveKeyInfos> toNextDBList = new ArrayList<>();
-          // A list of renamed keys/files/dirs
-          List<HddsProtos.KeyValue> renamedList = new ArrayList<>();
-          List<String> dirsToMove = new ArrayList<>();
-
-          long remainNum = handleDirectoryCleanUp(snapshotDeletedDirTable,
-              previousDirTable, renamedTable, dbBucketKeyForDir, snapInfo,
-              omSnapshot, dirsToMove, renamedList);
-          int deletionCount = 0;
-
-          try (TableIterator<String, ? extends Table.KeyValue<String,
-              RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable
-              .iterator()) {
-
-            List<BlockGroup> keysToPurge = new ArrayList<>();
-            deletedIterator.seek(snapshotBucketKey);
-
-            while (deletedIterator.hasNext() &&
-                deletionCount < remainNum) {
-              Table.KeyValue<String, RepeatedOmKeyInfo>
-                  deletedKeyValue = deletedIterator.next();
-              String deletedKey = deletedKeyValue.getKey();
-
-              // Exit if it is out of the bucket scope.
-              if (!deletedKey.startsWith(snapshotBucketKey)) {
-                // If snapshot deletedKeyTable doesn't have any
-                // entry in the snapshot scope it can be reclaimed
-                break;
-              }
-
-              RepeatedOmKeyInfo repeatedOmKeyInfo = deletedKeyValue.getValue();
-
-              SnapshotMoveKeyInfos.Builder toReclaim = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              SnapshotMoveKeyInfos.Builder toNextDb = SnapshotMoveKeyInfos
-                  .newBuilder()
-                  .setKey(deletedKey);
-              HddsProtos.KeyValue.Builder renamedKey = HddsProtos.KeyValue
-                  .newBuilder();
-
-              for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
-                splitRepeatedOmKeyInfo(toReclaim, toNextDb, renamedKey,
-                    keyInfo, previousKeyTable, renamedTable,
-                    bucketInfo, volumeId);
+          // nextSnapshot = null means entries would be moved to AOS, hence 
ensure that KeyDeletingService &
+          // DirectoryDeletingService is not running while the entries are 
moving.
+          if (nextSnapshot == null) {
+            KeyDeletingService keyDeletingService = 
getOzoneManager().getKeyManager().getDeletingService();
+            synchronized (keyDeletingService) {
+              while (keyDeletingService != null && 
keyDeletingService.isRunningOnAOS()) {
+                keyDeletingService.wait(serviceTimeout);
               }
+            }
 
-              // If all the KeyInfos are reclaimable in RepeatedOmKeyInfo
-              // then no need to update current snapshot deletedKeyTable.
-              if (!(toReclaim.getKeyInfosCount() ==
-                  repeatedOmKeyInfo.getOmKeyInfoList().size())) {
-                toReclaimList.add(toReclaim.build());
-                toNextDBList.add(toNextDb.build());
-              } else {
-                // The key can be reclaimed here.
-                List<BlockGroup> blocksForKeyDelete = omSnapshot
-                    .getMetadataManager()
-                    .getBlocksForKeyDelete(deletedKey);
-                if (blocksForKeyDelete != null) {
-                  keysToPurge.addAll(blocksForKeyDelete);
-                }
+            DirectoryDeletingService directoryDeletingService = 
getOzoneManager().getKeyManager()
+                .getDirDeletingService();
+            synchronized (directoryDeletingService) {
+              while (directoryDeletingService != null && 
directoryDeletingService.isRunningOnAOS()) {
+                directoryDeletingService.wait(serviceTimeout);
               }
-
-              if (renamedKey.hasKey() && renamedKey.hasValue()) {
-                renamedList.add(renamedKey.build());
+            }
+          }
+          try (ReferenceCounted<OmSnapshot> snapshot = 
omSnapshotManager.getSnapshot(
+              snapInfo.getVolumeName(), snapInfo.getBucketName(), 
snapInfo.getName())) {
+            KeyManager snapshotKeyManager = snapshot.get().getKeyManager();
+            int moveCount = 0;
+            // Get all entries from deletedKeyTable.
+            List<Table.KeyValue<String, List<OmKeyInfo>>> deletedKeyEntries =
+                
snapshotKeyManager.getDeletedKeyEntries(snapInfo.getVolumeName(), 
snapInfo.getBucketName(),
+                    null, remaining);
+            moveCount += deletedKeyEntries.size();
+            // Get all entries from deletedDirTable.
+            List<Table.KeyValue<String, OmKeyInfo>> deletedDirEntries = 
snapshotKeyManager.getDeletedDirEntries(
+                snapInfo.getVolumeName(), snapInfo.getBucketName(), remaining 
- moveCount);
+            moveCount += deletedDirEntries.size();
+            // Get all entries from snapshotRenamedTable.
+            List<Table.KeyValue<String, String>> renameEntries = 
snapshotKeyManager.getRenamesKeyEntries(
+                snapInfo.getVolumeName(), snapInfo.getBucketName(), null, 
remaining - moveCount);
+            moveCount += renameEntries.size();
+            if (moveCount > 0) {
+              try {
+                submitSnapshotMoveDeletedKeys(snapInfo, 
deletedKeyEntries.stream().map(kv -> {
+                  try {
+                    return 
SnapshotMoveKeyInfos.newBuilder().setKey(kv.getKey()).addAllKeyInfos(kv.getValue()
+                        .stream().map(val -> 
val.getProtobuf(ClientVersion.CURRENT_VERSION))
+                        .collect(Collectors.toList())).build();
+                  } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                  }
+                }).collect(Collectors.toList()),
+                    renameEntries.stream().map(kv -> {
+                      try {
+                        return 
HddsProtos.KeyValue.newBuilder().setKey(kv.getKey()).setValue(kv.getValue()).build();
+                      } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                      }
+                    }).collect(Collectors.toList()),
+                    deletedDirEntries.stream()
+                        .map(kv -> {
+                          try {
+                            return 
SnapshotMoveKeyInfos.newBuilder().setKey(kv.getKey())
+                                
.addKeyInfos(kv.getValue().getProtobuf(ClientVersion.CURRENT_VERSION)).build();
+                          } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                          }
+                        }).collect(Collectors.toList()));
+                remaining -= moveCount;
+              } catch (UncheckedIOException e) {
+                throw e.getCause();
               }
-              deletionCount++;
+            } else {
+              snapshotsToBePurged.add(snapInfo);
             }
-
-            // Delete keys From deletedTable
-            processKeyDeletes(keysToPurge, omSnapshot.getKeyManager(),
-                null, snapInfo.getTableKey());
-            successRunCount.incrementAndGet();
-          } catch (IOException ex) {
-            LOG.error("Error while running Snapshot Deleting Service for " +
-                "snapshot " + snapInfo.getTableKey() + " with snapshotId " +
-                snapInfo.getSnapshotId() + ". Processed " + deletionCount +
-                " keys and " + (keyLimitPerSnapshot - remainNum) +
-                " directories and files", ex);
           }
+          successRunCount.incrementAndGet();
           snapshotLimit--;
-          // Submit Move request to OM.
-          submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList,
-              toNextDBList, renamedList, dirsToMove);
-
-          // Properly decrement ref count for rcOmPreviousSnapshot
-          if (rcOmPreviousSnapshot != null) {
-            rcOmPreviousSnapshot.close();
-            rcOmPreviousSnapshot = null;
-          }
+        }
+        if (!snapshotsToBePurged.isEmpty()) {
+          
submitSnapshotPurgeRequest(snapshotsToBePurged.stream().map(SnapshotInfo::getTableKey)

Review Comment:
   Why is `snapshotsToBePurged` list of `SnapshotInfo` if we don't need it? 
Just keep snapshotTableKey.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java:
##########
@@ -242,4 +242,43 @@ public static String 
getOzonePathKeyForFso(OMMetadataManager metadataManager,
     final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
     return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX;
   }
+
+  /**
+   * Creates merged repeatedKeyInfo entry with the existing deleted entry in 
the table.
+   * @param snapshotMoveKeyInfos keyInfos to be added.
+   * @param metadataManager metadataManager for a store.
+   * @return
+   * @throws IOException
+   */
+  public static RepeatedOmKeyInfo 
createMergedRepeatedOmKeyInfoFromDeletedTableEntry(
+      OzoneManagerProtocolProtos.SnapshotMoveKeyInfos snapshotMoveKeyInfos, 
OMMetadataManager metadataManager) throws
+      IOException {
+    String dbKey = snapshotMoveKeyInfos.getKey();
+    List<OmKeyInfo> keyInfoList = new ArrayList<>();
+    for (OzoneManagerProtocolProtos.KeyInfo info : 
snapshotMoveKeyInfos.getKeyInfosList()) {
+      OmKeyInfo fromProtobuf = OmKeyInfo.getFromProtobuf(info);
+      keyInfoList.add(fromProtobuf);
+    }
+    // When older version of keys are moved to the next snapshot's deletedTable
+    // The newer version might also be in the next snapshot's deletedTable and
+    // it might overwrite. This is to avoid that and also avoid having
+    // orphans blocks. Checking the last keyInfoList size omKeyInfo versions,
+    // this is to avoid redundant additions if the last n versions match.

Review Comment:
   Can you please rephrase this comment? It is hard to follow.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotChainManager.java:
##########
@@ -382,6 +389,42 @@ public UUID getLatestGlobalSnapshotId() throws IOException 
{
     return latestGlobalSnapshotId;
   }
 
+  /**
+   * Get oldest of global snapshot in snapshot chain.
+   */
+  public UUID getOldestGlobalSnapshotId() throws IOException {
+    validateSnapshotChain();
+    return oldestGlobalSnapshotId;
+  }
+
+  public Iterator<UUID> iterator(final boolean reverse) throws IOException {
+    validateSnapshotChain();
+    return new Iterator<UUID>() {
+      private UUID currentSnapshotId = reverse ? getLatestGlobalSnapshotId() : 
getOldestGlobalSnapshotId();
+      @Override
+      public boolean hasNext() {
+        return currentSnapshotId != null;
+      }
+
+      @Override
+      public UUID next() {
+        try {
+          UUID prevSnapshotId = currentSnapshotId;
+          if (reverse && hasPreviousGlobalSnapshot(currentSnapshotId) ||
+              !reverse && hasNextGlobalSnapshot(currentSnapshotId)) {

Review Comment:
   nit: add parentheses to make sure that the correct operator's precedence is 
taken.
   ```suggestion
             if ((reverse && hasPreviousGlobalSnapshot(currentSnapshotId)) ||
                 (!reverse && hasNextGlobalSnapshot(currentSnapshotId))) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to