This is an automated email from the ASF dual-hosted git repository.

aswinshakil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bdd26eeca9 HDDS-8686. [Snapshot] SnapshotDeletingService to reclaim 
old versions of key. (#4811)
bdd26eeca9 is described below

commit bdd26eeca9fe7adfd87a4ee1e0acfd84d6148c57
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Fri Jun 16 17:16:27 2023 -0700

    HDDS-8686. [Snapshot] SnapshotDeletingService to reclaim old versions of 
key. (#4811)
---
 .../hadoop/hdds/scm/storage/BlockLocationInfo.java |  14 +++
 .../hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java |   4 +
 .../fs/ozone/TestSnapshotDeletingService.java      | 118 ++++++++++++++++++---
 .../java/org/apache/hadoop/ozone/TestDataUtil.java |  13 +++
 .../org/apache/hadoop/ozone/om/TestKeyPurging.java |   2 +-
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  13 +--
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |   3 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  65 +++++++-----
 .../hadoop/ozone/om/PendingKeysDeletion.java       |  48 +++++++++
 .../ozone/om/request/key/OMKeyPurgeRequest.java    |   6 +-
 .../ozone/om/response/key/OMKeyPurgeResponse.java  |  26 ++++-
 .../OMSnapshotMoveDeletedKeysResponse.java         |  37 ++++++-
 .../om/service/AbstractKeyDeletingService.java     |  50 +++++++--
 .../ozone/om/service/KeyDeletingService.java       |   8 +-
 .../ozone/om/service/SnapshotDeletingService.java  |  55 +++++++++-
 .../key/TestOMKeyPurgeRequestAndResponse.java      |   4 +-
 .../ozone/om/service/TestKeyDeletingService.java   |  24 +++--
 18 files changed, 411 insertions(+), 80 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
index 286762d5d7..cf368b0680 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
@@ -176,6 +176,20 @@ public class BlockLocationInfo {
         + '}';
   }
 
+  public boolean hasSameBlockAs(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BlockLocationInfo that = (BlockLocationInfo) o;
+    return length == that.length &&
+        offset == that.offset &&
+        createVersion == that.createVersion &&
+        Objects.equals(blockID, that.blockID);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
index 467810fc30..6c85d4d058 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
@@ -55,6 +55,10 @@ public class RepeatedOmKeyInfo implements 
CopyObject<RepeatedOmKeyInfo> {
 
   private final List<OmKeyInfo> omKeyInfoList;
 
+  public RepeatedOmKeyInfo() {
+    this.omKeyInfoList = new ArrayList<>();
+  }
+
   public RepeatedOmKeyInfo(List<OmKeyInfo> omKeyInfos) {
     this.omKeyInfoList = omKeyInfos;
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java
index 7f884ba64d..57501d11ae 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java
@@ -22,14 +22,18 @@ package org.apache.hadoop.fs.ozone;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -41,24 +45,26 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /**
@@ -69,7 +75,8 @@ public class TestSnapshotDeletingService {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestSnapshotDeletingService.class);
   private static boolean omRatisEnabled = true;
-  private static final String CONTENT = "testContent";
+  private static final ByteBuffer CONTENT =
+      ByteBuffer.allocate(1024 * 1024 * 16);
 
   private MiniOzoneCluster cluster;
   private OzoneManager om;
@@ -82,13 +89,17 @@ public class TestSnapshotDeletingService {
   @BeforeEach
   public void setup() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
+        4, StorageUnit.MB);
+    conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
+        1, StorageUnit.MB);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
-        200, TimeUnit.MILLISECONDS);
+        500, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT,
         10000, TimeUnit.MILLISECONDS);
-    conf.setInt(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 100);
+    conf.setInt(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 500);
     conf.setInt(OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK, 5);
-    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
         TimeUnit.MILLISECONDS);
     conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, omRatisEnabled);
     conf.setBoolean(OZONE_ACL_ENABLED, true);
@@ -134,7 +145,7 @@ public class TestSnapshotDeletingService {
         bucket1snap3.getMetadataManager()
             .getDeletedTable().getRangeKVs(null, 100,
                 "/vol1/bucket1/bucket1key1");
-    Assertions.assertEquals(1, omKeyInfos.size());
+    assertEquals(1, omKeyInfos.size());
   }
 
   @Test
@@ -210,6 +221,12 @@ public class TestSnapshotDeletingService {
           ReplicationType.RATIS, CONTENT);
     }
 
+    // Create 5 keys to overwrite
+    for (int i = 11; i <= 15; i++) {
+      TestDataUtil.createKey(bucket2, "key" + i, ReplicationFactor.THREE,
+          ReplicationType.RATIS, CONTENT);
+    }
+
     // Create Directory and Sub
     for (int i = 1; i <= 3; i++) {
       String parent = "parent" + i;
@@ -227,7 +244,7 @@ public class TestSnapshotDeletingService {
 
     // Total 12 dirs, 19 keys.
     assertTableRowCount(dirTable, 12);
-    assertTableRowCount(keyTable, 19);
+    assertTableRowCount(keyTable, 24);
     assertTableRowCount(deletedDirTable, 0);
 
     // Create Snapshot1
@@ -235,6 +252,13 @@ public class TestSnapshotDeletingService {
         "snap1");
     assertTableRowCount(snapshotInfoTable, 1);
 
+    // Overwrite 3 keys -> Moves previous version to deletedTable
+    for (int i = 11; i <= 13; i++) {
+      TestDataUtil.createKey(bucket2, "key" + i, ReplicationFactor.THREE,
+          ReplicationType.RATIS, CONTENT);
+    }
+    assertTableRowCount(keyTable, 24);
+
     // Delete 5 Keys
     for (int i = 1; i <= 5; i++) {
       client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
@@ -271,7 +295,7 @@ public class TestSnapshotDeletingService {
           "/renamedParent" + i, true);
     }
 
-    assertTableRowCount(deletedTable, 8);
+    assertTableRowCount(deletedTable, 11);
     assertTableRowCount(deletedDirTable, 3);
     assertTableRowCount(dirTable, 9);
     assertTableRowCount(renamedTable, 4);
@@ -287,13 +311,25 @@ public class TestSnapshotDeletingService {
     assertTableRowCount(deletedTable, 0);
     assertTableRowCount(deletedDirTable, 0);
 
+    // Delete 3 overwritten keys
+    for (int i = 11; i <= 13; i++) {
+      client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+          "key" + i, false);
+    }
+
+    // Overwrite 2 keys
+    for (int i = 14; i <= 15; i++) {
+      TestDataUtil.createKey(bucket2, "key" + i, ReplicationFactor.THREE,
+          ReplicationType.RATIS, CONTENT);
+    }
+
     // Delete 2 more keys
     for (int i = 9; i <= 10; i++) {
       client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
           "key" + i, false);
     }
 
-    assertTableRowCount(deletedTable, 2);
+    assertTableRowCount(deletedTable, 7);
 
     // Create Snapshot3
     client.getObjectStore().createSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
@@ -303,7 +339,7 @@ public class TestSnapshotDeletingService {
     assertTableRowCount(renamedTable, 0);
     assertTableRowCount(deletedDirTable, 0);
     assertTableRowCount(deletedTable, 0);
-    assertTableRowCount(keyTable, 9);
+    assertTableRowCount(keyTable, 11);
     SnapshotInfo deletedSnap = om.getMetadataManager()
         .getSnapshotInfoTable().get("/vol1/bucket2/snap2");
 
@@ -311,6 +347,12 @@ public class TestSnapshotDeletingService {
         "snap2");
     assertTableRowCount(snapshotInfoTable, 2);
 
+    // Delete 2 overwritten keys
+    for (int i = 14; i <= 15; i++) {
+      client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_TWO,
+          "key" + i, false);
+    }
+    assertTableRowCount(deletedTable, 2);
     // Once all the tables are moved, the snapshot is deleted
     assertTableRowCount(om.getMetadataManager().getSnapshotInfoTable(), 2);
 
@@ -329,12 +371,12 @@ public class TestSnapshotDeletingService {
     assertTableRowCount(snapRenamedTable, 4);
     assertTableRowCount(snapDeletedDirTable, 3);
     // All the keys deleted before snapshot2 is moved to snap3
-    assertTableRowCount(snapDeletedTable, 10);
+    assertTableRowCount(snapDeletedTable, 15);
 
     // Before deleting the last snapshot
     assertTableRowCount(renamedTable, 0);
     assertTableRowCount(deletedDirTable, 0);
-    assertTableRowCount(deletedTable, 0);
+    assertTableRowCount(deletedTable, 2);
     // Delete Snapshot3 and check entries moved to active DB
     client.getObjectStore().deleteSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
         "snap3");
@@ -343,17 +385,54 @@ public class TestSnapshotDeletingService {
     assertTableRowCount(snapshotInfoTable, 1);
     assertTableRowCount(renamedTable, 4);
     assertTableRowCount(deletedDirTable, 3);
-    assertTableRowCount(deletedTable, 10);
+
+    OmSnapshot snap1 = (OmSnapshot) om.getOmSnapshotManager()
+        .checkForSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
+            getSnapshotPrefix("snap1"), true);
+    Table<String, OmKeyInfo> snap1KeyTable =
+        snap1.getMetadataManager().getFileTable();
+    try (TableIterator<String, ? extends Table.KeyValue<String,
+        RepeatedOmKeyInfo>> iterator = deletedTable.iterator()) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, RepeatedOmKeyInfo> next = iterator.next();
+        String activeDBDeletedKey = next.getKey();
+        if (activeDBDeletedKey.matches(".*/key1.*")) {
+          RepeatedOmKeyInfo activeDBDeleted = next.getValue();
+          OMMetadataManager metadataManager =
+              cluster.getOzoneManager().getMetadataManager();
+          assertEquals(activeDBDeleted.getOmKeyInfoList().size(), 1);
+          OmKeyInfo activeDbDeletedKeyInfo =
+              activeDBDeleted.getOmKeyInfoList().get(0);
+          long volumeId = metadataManager
+              .getVolumeId(activeDbDeletedKeyInfo.getVolumeName());
+          long bucketId = metadataManager
+              .getBucketId(activeDbDeletedKeyInfo.getVolumeName(),
+                  activeDbDeletedKeyInfo.getBucketName());
+          String keyForSnap =
+              metadataManager.getOzonePathKey(volumeId, bucketId,
+                  activeDbDeletedKeyInfo.getParentObjectID(),
+                  activeDbDeletedKeyInfo.getKeyName());
+          OmKeyInfo snap1keyInfo = snap1KeyTable.get(keyForSnap);
+          assertEquals(activeDbDeletedKeyInfo.getLatestVersionLocations()
+              .getLocationList(), snap1keyInfo.getLatestVersionLocations()
+              .getLocationList());
+        }
+      }
+    }
+    assertTableRowCount(deletedTable, 15);
   }
 
   /*
       Flow
       ----
+      create key0
       create key1
       create snapshot1
+      create key0
       create key2
       delete key1
       delete key2
+      delete key0
       create snapshot2
       create key3
       create key4
@@ -371,14 +450,21 @@ public class TestSnapshotDeletingService {
     OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
         om.getMetadataManager();
 
+    TestDataUtil.createKey(bucket1, "bucket1key0", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
     TestDataUtil.createKey(bucket1, "bucket1key1", ReplicationFactor.THREE,
         ReplicationType.RATIS, CONTENT);
-    assertTableRowCount(keyTable, 1);
+    assertTableRowCount(keyTable, 2);
 
     // Create Snapshot 1.
     client.getProxy().createSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
         "bucket1snap1");
     assertTableRowCount(snapshotInfoTable, 1);
+
+    // Overwrite bucket1key0, This is a newer version of the key which should
+    // reclaimed as this is a different version of the key.
+    TestDataUtil.createKey(bucket1, "bucket1key0", ReplicationFactor.THREE,
+        ReplicationType.RATIS, CONTENT);
     TestDataUtil.createKey(bucket1, "bucket1key2", ReplicationFactor.THREE,
         ReplicationType.RATIS, CONTENT);
 
@@ -389,6 +475,10 @@ public class TestSnapshotDeletingService {
     // it is not being referenced by previous snapshot.
     client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_ONE,
         "bucket1key2", false);
+    client.getProxy().deleteKey(VOLUME_NAME, BUCKET_NAME_ONE,
+        "bucket1key0", false);
+    assertTableRowCount(keyTable, 0);
+    // bucket1key0 should also be reclaimed as it not same
     assertTableRowCount(deletedTable, 1);
 
     // Create Snapshot 2.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index e1512d1753..5ff4b7f422 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Scanner;
 
@@ -113,6 +114,18 @@ public final class TestDataUtil {
     }
   }
 
+  public static void createKey(OzoneBucket bucket, String keyName,
+      ReplicationFactor repFactor, ReplicationType repType,
+      ByteBuffer data) throws IOException {
+    ReplicationConfig repConfig = ReplicationConfig
+        .fromTypeAndFactor(repType, repFactor);
+    try (OutputStream stream = bucket
+        .createKey(keyName, data.capacity(), repConfig,
+            new HashMap<>())) {
+      stream.write(data.array());
+    }
+  }
+
   public static String getKey(OzoneBucket bucket, String keyName)
       throws IOException {
     try (InputStream stream = bucket.readKey(keyName)) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index 6b3d7ea8e9..98ffca8b3d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -145,7 +145,7 @@ public class TestKeyPurging {
         () -> {
           try {
             return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
-                .size() == 0;
+                .getKeyBlocksList().size() == 0;
           } catch (IOException e) {
             return false;
           }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 9e7fa19a33..ea0f9a9b02 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1218,6 +1218,7 @@ message PurgeKeysRequest {
     repeated DeletedKeys deletedKeys = 1;
     // if set, will purge keys in a snapshot DB instead of active DB
     optional string snapshotTableKey = 2;
+    repeated SnapshotMoveKeyInfos keysToUpdate = 3;
 }
 
 message PurgeKeysResponse {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 1ebe65ad2f..0bcc7f8231 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -121,16 +121,17 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
       String startKeyName, String keyPrefix, int maxKeys) throws IOException;
 
   /**
-   * Returns a list of pending deletion key info that ups to the given count.
-   * Each entry is a {@link BlockGroup}, which contains the info about the
-   * key name and all its associated block IDs. A pending deletion key is
-   * stored with #deleting# prefix in OM DB.
+   * Returns a PendingKeysDeletion. It has a list of pending deletion key info
+   * that ups to the given count.Each entry is a {@link BlockGroup}, which
+   * contains the info about the key name and all its associated block IDs.
+   * Second is a Mapping of Key-Value pair which is updated in the 
deletedTable.
    *
    * @param count max number of keys to return.
-   * @return a list of {@link BlockGroup} representing keys and blocks.
+   * @return a Pair of list of {@link BlockGroup} representing keys and blocks,
+   * and a hashmap for key-value pair to be updated in the deletedTable.
    * @throws IOException
    */
-  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
+  PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException;
 
   /**
    * Returns the names of up to {@code count} open keys whose age is
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 9775d36c98..ee15405cbd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -60,7 +60,6 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
@@ -600,7 +599,7 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public List<BlockGroup> getPendingDeletionKeys(final int count)
+  public PendingKeysDeletion getPendingDeletionKeys(final int count)
       throws IOException {
     OmMetadataManagerImpl omMetadataManager =
         (OmMetadataManagerImpl) metadataManager;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index f6519bb098..58d4c83cf0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -100,6 +100,7 @@ import static 
org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
+import static 
org.apache.hadoop.ozone.om.service.SnapshotDeletingService.isBlockLocationInfoSame;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotDirExist;
 
 import org.apache.hadoop.util.Time;
@@ -1503,13 +1504,16 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
    * @return a list of {@link BlockGroup} represent keys and blocks.
    * @throws IOException
    */
-  public List<BlockGroup> getPendingDeletionKeys(final int keyCount,
-      OmSnapshotManager omSnapshotManager) throws IOException {
+  public PendingKeysDeletion getPendingDeletionKeys(final int keyCount,
+                             OmSnapshotManager omSnapshotManager)
+      throws IOException {
     List<BlockGroup> keyBlocksList = Lists.newArrayList();
+    HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
     try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
              keyIter = getDeletedTable().iterator()) {
       int currentCount = 0;
       while (keyIter.hasNext() && currentCount < keyCount) {
+        RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
         KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
         if (kv != null) {
           List<BlockGroup> blockGroupList = Lists.newArrayList();
@@ -1527,18 +1531,6 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
             // Skip the key if it exists in the previous snapshot (of the same
             // scope) as in this case its blocks should not be reclaimed
 
-            // TODO: [SNAPSHOT] HDDS-7968
-            //  1. If previous snapshot keyTable has key info.getObjectID(),
-            //  skip it. Pending HDDS-7740 merge to reuse the util methods to
-            //  check previousSnapshot.
-            //  2. For efficient lookup, the addition in design doc 4.b)1.b
-            //  is critical.
-            //  3. With snapshot it is possible that only some of the keys in
-            //  the DB key's RepeatedOmKeyInfo list can be reclaimed,
-            //  make sure to update deletedTable accordingly in this case.
-            //  4. Further optimization: Skip all snapshotted keys altogether
-            //  e.g. by prefixing all unreclaimable keys, then calling seek
-
             // If the last snapshot is deleted and the keys renamed in between
             // the snapshots will be cleaned up by KDS. So we need to check
             // in the renamedTable as well.
@@ -1578,17 +1570,14 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
               // have to check deletedTable of previous snapshot
               RepeatedOmKeyInfo delOmKeyInfo =
                   prevDeletedTable.get(prevDelTableDBKey);
-              if ((omKeyInfo != null &&
-                  info.getObjectID() == omKeyInfo.getObjectID()) ||
-                  delOmKeyInfo != null) {
-                // TODO: [SNAPSHOT] For now, we are not cleaning up a key in
-                //  active DB's deletedTable if any one of the keys in
-                //  RepeatedOmKeyInfo exists in last snapshot's key/fileTable.
-                //  Might need to refactor OMKeyDeleteRequest first to take
-                //  actual reclaimed key objectIDs as input
-                //  in order to avoid any race condition.
-                blockGroupList.clear();
-                break;
+              if (versionExistsInPreviousSnapshot(omKeyInfo,
+                  info, delOmKeyInfo)) {
+                // If the infoList size is 1, there is nothing to split.
+                // We either delete it or skip it.
+                if (!(infoList.getOmKeyInfoList().size() == 1)) {
+                  notReclaimableKeyInfo.addOmKeyInfo(info);
+                }
+                continue;
               }
             }
 
@@ -1606,11 +1595,33 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
             }
             currentCount++;
           }
-          keyBlocksList.addAll(blockGroupList);
+
+          List<OmKeyInfo> notReclaimableKeyInfoList =
+              notReclaimableKeyInfo.getOmKeyInfoList();
+
+          // If all the versions are not reclaimable, then do nothing.
+          if (notReclaimableKeyInfoList.size() > 0 &&
+              notReclaimableKeyInfoList.size() !=
+                  infoList.getOmKeyInfoList().size()) {
+            keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
+          }
+
+          if (notReclaimableKeyInfoList.size() !=
+              infoList.getOmKeyInfoList().size()) {
+            keyBlocksList.addAll(blockGroupList);
+          }
         }
       }
     }
-    return keyBlocksList;
+    return new PendingKeysDeletion(keyBlocksList, keysToModify);
+  }
+
+  private boolean versionExistsInPreviousSnapshot(OmKeyInfo omKeyInfo,
+      OmKeyInfo info, RepeatedOmKeyInfo delOmKeyInfo) {
+    return (omKeyInfo != null &&
+        info.getObjectID() == omKeyInfo.getObjectID() &&
+        isBlockLocationInfoSame(omKeyInfo, info)) ||
+        delOmKeyInfo != null;
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
new file mode 100644
index 0000000000..f2d73aaf41
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Return class for OMMetadataManager#getPendingDeletionKeys.
+ */
+public class PendingKeysDeletion {
+
+  private HashMap<String, RepeatedOmKeyInfo> keysToModify;
+  private List<BlockGroup> keyBlocksList;
+
+  public PendingKeysDeletion(List<BlockGroup> keyBlocksList,
+       HashMap<String, RepeatedOmKeyInfo> keysToModify) {
+    this.keysToModify = keysToModify;
+    this.keyBlocksList = keyBlocksList;
+  }
+
+  public HashMap<String, RepeatedOmKeyInfo> getKeysToModify() {
+    return keysToModify;
+  }
+
+  public List<BlockGroup> getKeyBlocksList() {
+    return keyBlocksList;
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index 286f89d69e..3d317e148e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -33,6 +33,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Deleted
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +60,8 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
     PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest();
     List<DeletedKeys> bucketDeletedKeysList = purgeKeysRequest
         .getDeletedKeysList();
+    List<SnapshotMoveKeyInfos> keysToUpdateList = purgeKeysRequest
+        .getKeysToUpdateList();
     OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
     String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ?
         purgeKeysRequest.getSnapshotTableKey() : null;
@@ -88,7 +92,7 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
       }
 
       omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
-          keysToBePurgedList, omFromSnapshot);
+          keysToBePurgedList, omFromSnapshot, keysToUpdateList);
     } catch (IOException ex) {
       omClientResponse = new OMKeyPurgeResponse(
           createErrorOMResponse(omResponse, ex));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
index 0103f925bb..d967f65732 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
@@ -21,9 +21,13 @@ package org.apache.hadoop.ozone.om.response.key;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
+
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
 import java.io.IOException;
@@ -31,6 +35,7 @@ import java.util.List;
 import javax.annotation.Nonnull;
 
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveDeletedKeysResponse.createRepeatedOmKeyInfo;
 
 /**
  * Response for {@link OMKeyPurgeRequest} request.
@@ -39,12 +44,15 @@ import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
 public class OMKeyPurgeResponse extends OmKeyResponse {
   private List<String> purgeKeyList;
   private OmSnapshot fromSnapshot;
+  private List<SnapshotMoveKeyInfos> keysToUpdateList;
 
   public OMKeyPurgeResponse(@Nonnull OMResponse omResponse,
-      @Nonnull List<String> keyList, OmSnapshot fromSnapshot) {
+      @Nonnull List<String> keyList, OmSnapshot fromSnapshot,
+      List<SnapshotMoveKeyInfos> keysToUpdate) {
     super(omResponse);
     this.purgeKeyList = keyList;
     this.fromSnapshot = fromSnapshot;
+    this.keysToUpdateList = keysToUpdate;
   }
 
   /**
@@ -69,6 +77,22 @@ public class OMKeyPurgeResponse extends OmKeyResponse {
       }
     } else {
       processKeys(batchOperation, omMetadataManager);
+      processKeysToUpdate(batchOperation, omMetadataManager);
+    }
+  }
+
+  private void processKeysToUpdate(BatchOperation batchOp,
+      OMMetadataManager metadataManager) throws IOException {
+    if (keysToUpdateList == null) {
+      return;
+    }
+
+    for (SnapshotMoveKeyInfos keyToUpdate : keysToUpdateList) {
+      List<KeyInfo> keyInfosList = keyToUpdate.getKeyInfosList();
+      RepeatedOmKeyInfo repeatedOmKeyInfo =
+          createRepeatedOmKeyInfo(keyInfosList);
+      metadataManager.getDeletedTable().putWithBatch(batchOp,
+          keyToUpdate.getKey(), repeatedOmKeyInfo);
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
index ab3350fd47..3846dfbf5e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
@@ -164,7 +164,7 @@ public class OMSnapshotMoveDeletedKeysResponse extends 
OMClientResponse {
 
     for (SnapshotMoveKeyInfos dBKey : nextDBKeysList) {
       RepeatedOmKeyInfo omKeyInfos =
-          createRepeatedOmKeyInfo(dBKey.getKeyInfosList());
+          createRepeatedOmKeyInfo(dBKey, metadataManager);
       if (omKeyInfos == null) {
         continue;
       }
@@ -173,8 +173,8 @@ public class OMSnapshotMoveDeletedKeysResponse extends 
OMClientResponse {
     }
   }
 
-  private RepeatedOmKeyInfo createRepeatedOmKeyInfo(List<KeyInfo> keyInfoList)
-      throws IOException {
+  public static RepeatedOmKeyInfo createRepeatedOmKeyInfo(
+      List<KeyInfo> keyInfoList) throws IOException {
     RepeatedOmKeyInfo result = null;
 
     for (KeyInfo keyInfo: keyInfoList) {
@@ -187,5 +187,36 @@ public class OMSnapshotMoveDeletedKeysResponse extends 
OMClientResponse {
 
     return result;
   }
+
+  private RepeatedOmKeyInfo createRepeatedOmKeyInfo(
+      SnapshotMoveKeyInfos snapshotMoveKeyInfos,
+      OMMetadataManager metadataManager) throws IOException {
+    String dbKey = snapshotMoveKeyInfos.getKey();
+    List<KeyInfo> keyInfoList = snapshotMoveKeyInfos.getKeyInfosList();
+    // When older version of keys are moved to the next snapshot's deletedTable
+    // The newer version might also be in the next snapshot's deletedTable and
+    // it might overwrite. This is to avoid that and also avoid having
+    // orphans blocks.
+    RepeatedOmKeyInfo result = metadataManager.getDeletedTable().get(dbKey);
+
+    for (KeyInfo keyInfo : keyInfoList) {
+      OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+      if (result == null) {
+        result = new RepeatedOmKeyInfo(omKeyInfo);
+      } else if (!isSameAsLatestOmKeyInfo(omKeyInfo, result)) {
+        result.addOmKeyInfo(omKeyInfo);
+      }
+    }
+
+    return result;
+  }
+
+  private boolean isSameAsLatestOmKeyInfo(OmKeyInfo omKeyInfo,
+                                          RepeatedOmKeyInfo result) {
+    int size = result.getOmKeyInfoList().size();
+    assert size > 0;
+    OmKeyInfo keyInfoFromRepeated = result.getOmKeyInfoList().get(size - 1);
+    return omKeyInfo.equals(keyInfoFromRepeated);
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 152159b0e2..be950a2725 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -39,6 +39,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Deleted
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
@@ -53,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 
@@ -86,8 +88,9 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
   }
 
   protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
-                                  KeyManager manager,
-                                  String snapTableKey) throws IOException {
+      KeyManager manager,
+      HashMap<String, RepeatedOmKeyInfo> keysToModify,
+      String snapTableKey) throws IOException {
 
     long startTime = Time.monotonicNow();
     int delCount = 0;
@@ -95,7 +98,8 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
         scmClient.deleteKeyBlocks(keyBlocksList);
     if (blockDeletionResults != null) {
       if (isRatisEnabled()) {
-        delCount = submitPurgeKeysRequest(blockDeletionResults, snapTableKey);
+        delCount = submitPurgeKeysRequest(blockDeletionResults,
+            keysToModify, snapTableKey);
       } else {
         // TODO: Once HA and non-HA paths are merged, we should have
         //  only one code path here. Purge keys should go through an
@@ -147,9 +151,10 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
    * Submits PurgeKeys request for the keys whose blocks have been deleted
    * by SCM.
    * @param results DeleteBlockGroups returned by SCM.
+   * @param keysToModify Updated list of RepeatedOmKeyInfo
    */
   private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
-                                     String snapTableKey) {
+      HashMap<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey) {
     Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
         new HashMap<>();
 
@@ -159,10 +164,18 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
       if (result.isSuccess()) {
         // Add key to PurgeKeys list.
         String deletedKey = result.getObjectKey();
-        // Parse Volume and BucketName
-        addToMap(purgeKeysMapPerBucket, deletedKey);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        if (keysToModify != null && !keysToModify.containsKey(deletedKey)) {
+          // Parse Volume and BucketName
+          addToMap(purgeKeysMapPerBucket, deletedKey);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} set to be updated in OM DB, Other versions " +
+                "of the key that are reclaimable are reclaimed.", deletedKey);
+          }
+        } else if (keysToModify == null) {
+          addToMap(purgeKeysMapPerBucket, deletedKey);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+          }
         }
         deletedCount++;
       }
@@ -185,6 +198,27 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
       purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
     }
 
+    List<SnapshotMoveKeyInfos> keysToUpdateList = new ArrayList<>();
+    if (keysToModify != null) {
+      for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify :
+          keysToModify.entrySet()) {
+
+        SnapshotMoveKeyInfos.Builder keyToUpdate =
+            SnapshotMoveKeyInfos.newBuilder();
+        keyToUpdate.setKey(keyToModify.getKey());
+        List<OzoneManagerProtocolProtos.KeyInfo> keyInfos =
+            keyToModify.getValue().getOmKeyInfoList().stream()
+                .map(k -> k.getProtobuf(ClientVersion.CURRENT_VERSION))
+                .collect(Collectors.toList());
+        keyToUpdate.addAllKeyInfos(keyInfos);
+        keysToUpdateList.add(keyToUpdate.build());
+      }
+
+      if (keysToUpdateList.size() > 0) {
+        purgeKeysRequest.addAllKeysToUpdate(keysToUpdateList);
+      }
+    }
+
     OMRequest omRequest = OMRequest.newBuilder()
         .setCmdType(Type.PurgeKeys)
         .setPurgeKeysRequest(purgeKeysRequest)
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index 0a7a5d31a8..1a570ad244 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 
+import org.apache.hadoop.ozone.om.PendingKeysDeletion;
 import org.apache.ratis.protocol.ClientId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,11 +135,14 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
           //  OM would have to keep track of which snapshot the key is coming
           //  from if the above would be done inside getPendingDeletionKeys().
 
-          List<BlockGroup> keyBlocksList = manager
+          PendingKeysDeletion pendingKeysDeletion = manager
               .getPendingDeletionKeys(keyLimitPerTask);
+          List<BlockGroup> keyBlocksList = pendingKeysDeletion
+              .getKeyBlocksList();
           if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
             int delCount = processKeyDeletes(keyBlocksList,
-                getOzoneManager().getKeyManager(), null);
+                getOzoneManager().getKeyManager(),
+                pendingKeysDeletion.getKeysToModify(), null);
             deletedKeyCount.addAndGet(delCount);
           }
         } catch (IOException e) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index 58fa8e6074..bf258b1d31 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
@@ -281,7 +283,7 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
 
             // Delete keys From deletedTable
             processKeyDeletes(keysToPurge, omSnapshot.getKeyManager(),
-                snapInfo.getTableKey());
+                null, snapInfo.getTableKey());
             successRunCount.incrementAndGet();
           } catch (IOException ex) {
             LOG.error("Error while running Snapshot Deleting Service for " +
@@ -531,11 +533,14 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
       OmKeyInfo prevKeyInfo = renamedKey != null ? previousKeyTable
           .get(renamedKey) : previousKeyTable.get(dbKey);
 
-      if (prevKeyInfo == null) {
+      if (prevKeyInfo == null ||
+          prevKeyInfo.getObjectID() != deletedKeyInfo.getObjectID()) {
         return true;
       }
 
-      return prevKeyInfo.getObjectID() != deletedKeyInfo.getObjectID();
+      // For key overwrite the objectID will remain the same, In this
+      // case we need to check if OmKeyLocationInfo is also same.
+      return !isBlockLocationInfoSame(prevKeyInfo, deletedKeyInfo);
     }
 
     private SnapshotInfo getPreviousActiveSnapshot(SnapshotInfo snapInfo)
@@ -611,6 +616,50 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
     }
   }
 
+  public static boolean isBlockLocationInfoSame(OmKeyInfo prevKeyInfo,
+                                                OmKeyInfo deletedKeyInfo) {
+
+    // For hsync, Though the blockLocationInfo of a key may not be same
+    // at the time of snapshot and key deletion as blocks can be appended.
+    // If the objectId is same then the key is same.
+    if (prevKeyInfo.isHsync() && deletedKeyInfo.isHsync()) {
+      return true;
+    }
+
+    if (prevKeyInfo.getKeyLocationVersions().size() !=
+        deletedKeyInfo.getKeyLocationVersions().size()) {
+      return false;
+    }
+
+    OmKeyLocationInfoGroup deletedOmKeyLocation =
+        deletedKeyInfo.getLatestVersionLocations();
+    OmKeyLocationInfoGroup prevOmKeyLocation =
+        prevKeyInfo.getLatestVersionLocations();
+
+    if (deletedOmKeyLocation == null || prevOmKeyLocation == null) {
+      return false;
+    }
+
+    List<OmKeyLocationInfo> deletedLocationList =
+        deletedOmKeyLocation.getLocationList();
+    List<OmKeyLocationInfo> prevLocationList =
+        prevOmKeyLocation.getLocationList();
+
+    if (deletedLocationList.size() != prevLocationList.size()) {
+      return false;
+    }
+
+    for (int idx = 0; idx < deletedLocationList.size(); idx++) {
+      OmKeyLocationInfo deletedLocationInfo = deletedLocationList.get(idx);
+      OmKeyLocationInfo prevLocationInfo = prevLocationList.get(idx);
+      if (!deletedLocationInfo.hasSameBlockAs(prevLocationInfo)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index 7035f3ddd9..3385046e0c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -186,7 +186,7 @@ public class TestOMKeyPurgeRequestAndResponse extends 
TestOMKeyRequest {
         omMetadataManager.getStore().initBatchOperation()) {
 
       OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
-          omResponse, deletedKeyNames, null);
+          omResponse, deletedKeyNames, null, null);
       omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
 
       // Do manual commit and see whether addToBatch is successful or not.
@@ -243,7 +243,7 @@ public class TestOMKeyPurgeRequestAndResponse extends 
TestOMKeyRequest {
              omMetadataManager.getStore().initBatchOperation()) {
 
       OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
-          omResponse, deletedKeyNames, omSnapshot);
+          omResponse, deletedKeyNames, omSnapshot, null);
       omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
 
       // Do manual commit and see whether addToBatch is successful or not.
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 64180ca7e1..721f153095 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -143,8 +143,8 @@ public class TestKeyDeletingService {
         () -> keyDeletingService.getDeletedKeyCount().get() >= keyCount,
         1000, 10000);
     Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
-    Assert.assertEquals(0,
-        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
+    Assert.assertEquals(0, keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+        .getKeyBlocksList().size());
   }
 
   @Test(timeout = 40000)
@@ -169,7 +169,8 @@ public class TestKeyDeletingService {
         () -> {
           try {
             int numPendingDeletionKeys =
-                keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size();
+                keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+                    .getKeyBlocksList().size();
             if (numPendingDeletionKeys != keyCount) {
               LOG.info("Expected {} keys to be pending deletion, but got {}",
                   keyCount, numPendingDeletionKeys);
@@ -187,8 +188,8 @@ public class TestKeyDeletingService {
         100, 1000);
     // Since SCM calls are failing, deletedKeyCount should be zero.
     Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get());
-    Assert.assertEquals(keyCount,
-        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
+    Assert.assertEquals(keyCount, keyManager
+        .getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList().size());
   }
 
   @Test(timeout = 30000)
@@ -215,7 +216,8 @@ public class TestKeyDeletingService {
         () -> {
           try {
             int numPendingDeletionKeys =
-                keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size();
+                keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+                    .getKeyBlocksList().size();
             if (numPendingDeletionKeys != keyCount) {
               LOG.info("Expected {} keys to be pending deletion, but got {}",
                   keyCount, numPendingDeletionKeys);
@@ -269,6 +271,7 @@ public class TestKeyDeletingService {
         () -> {
           try {
             return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+                .getKeyBlocksList()
                 .stream()
                 .map(BlockGroup::getBlockIDList)
                 .flatMap(Collection::stream)
@@ -291,6 +294,7 @@ public class TestKeyDeletingService {
         () -> {
           try {
             return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+                .getKeyBlocksList()
                 .stream()
                 .map(BlockGroup::getBlockIDList)
                 .flatMap(Collection::stream)
@@ -341,8 +345,8 @@ public class TestKeyDeletingService {
         () -> keyDeletingService.getDeletedKeyCount().get() >= 1,
         1000, 10000);
     Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
-    Assert.assertEquals(0,
-        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
+    Assert.assertEquals(0, keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+            .getKeyBlocksList().size());
 
     // The 1st version of the key has 1 block and the 2nd version has 2
     // blocks. Hence, the ScmBlockClient should have received atleast 3
@@ -419,8 +423,8 @@ public class TestKeyDeletingService {
         () -> keyDeletingService.getDeletedKeyCount().get() >= 1,
         1000, 10000);
     Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
-    Assert.assertEquals(0,
-        keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size());
+    Assert.assertEquals(0, keyManager
+        .getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList().size());
 
     // deletedTable should have deleted key of the snapshot bucket
     Assert.assertFalse(metadataManager.getDeletedTable().isEmpty());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to