This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c78c07fda3 HDDS-8462. Key in deleteTable uniqueness (#4602)
c78c07fda3 is described below

commit c78c07fda378b26c4f3025a0b6fb18dcecbf27f7
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu May 4 22:30:00 2023 +0530

    HDDS-8462. Key in deleteTable uniqueness (#4602)
---
 .../client/rpc/TestOzoneAtRestEncryption.java      | 20 +++++++++---
 .../client/rpc/TestOzoneRpcClientAbstract.java     | 12 +++++---
 .../ozone/om/request/key/OMKeyCommitRequest.java   | 33 +++++++++++---------
 .../om/request/key/OMKeyCommitRequestWithFSO.java  | 36 +++++++++++++---------
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 11 ++-----
 .../S3MultipartUploadCompleteRequest.java          |  5 ++-
 .../response/key/AbstractOMKeyDeleteResponse.java  | 22 ++++++-------
 .../key/OMDirectoriesPurgeResponseWithFSO.java     |  2 ++
 .../ozone/om/response/key/OMKeyCommitResponse.java | 26 ++++++++++------
 .../response/key/OMKeyCommitResponseWithFSO.java   | 16 +++++-----
 .../response/key/OMKeyDeleteResponseWithFSO.java   |  2 ++
 .../response/key/OMKeysDeleteResponseWithFSO.java  |  2 ++
 .../om/response/key/OMTrashRecoverResponse.java    |  4 ++-
 .../ozone/om/request/OMRequestTestUtils.java       |  7 ++---
 .../om/request/key/TestOMKeyCommitRequest.java     | 11 ++++---
 .../om/response/key/TestOMKeyCommitResponse.java   | 24 ++++++++++++---
 .../key/TestOMKeyCommitResponseWithFSO.java        | 12 +++++++-
 .../om/response/key/TestOMKeyDeleteResponse.java   |  9 ++++--
 .../response/key/TestOMOpenKeysDeleteResponse.java | 19 ++++++++----
 .../ozone/om/service/TestKeyDeletingService.java   | 12 ++++++--
 20 files changed, 180 insertions(+), 105 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 3d521f3e98..8d8667051f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.BucketArgs;
@@ -353,18 +354,16 @@ public class TestOzoneAtRestEncryption {
     bucket.deleteKey(key.getName());
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    String objectKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
 
     GenericTestUtils.waitFor(() -> {
       try {
-        return omMetadataManager.getDeletedTable().isExist(objectKey);
+        return getMatchedKeyInfo(keyName, omMetadataManager) != null;
       } catch (IOException e) {
         return false;
       }
     }, 500, 100000);
     RepeatedOmKeyInfo deletedKeys =
-        omMetadataManager.getDeletedTable().get(objectKey);
+        getMatchedKeyInfo(keyName, omMetadataManager);
     Map<String, String> deletedKeyMetadata =
         deletedKeys.getOmKeyInfoList().get(0).getMetadata();
     Assert.assertFalse(deletedKeyMetadata.containsKey(OzoneConsts.GDPR_FLAG));
@@ -618,4 +617,17 @@ public class TestOzoneAtRestEncryption {
     TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf));
     TestOzoneRpcClient.setStore(ozClient.getObjectStore());
   }
+
+  private static RepeatedOmKeyInfo getMatchedKeyInfo(
+      String keyName, OMMetadataManager omMetadataManager) throws IOException {
+    List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
+        = omMetadataManager.getDeletedTable().getRangeKVs(
+        null, 100, "/");
+    for (int i = 0; i < rangeKVs.size(); ++i) {
+      if (rangeKVs.get(i).getKey().contains(keyName)) {
+        return rangeKVs.get(i).getValue();
+      }
+    }
+    return null;
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 0c87899f25..e7ab8d30a2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OmUtils;
@@ -4072,14 +4073,15 @@ public abstract class TestOzoneRpcClientAbstract {
         omKeyInfo.getKeyLocationVersions().size());
 
     if (expectedCount == 1) {
-      RepeatedOmKeyInfo repeatedOmKeyInfo = cluster
-          .getOzoneManager().getMetadataManager()
-          .getDeletedTable().get(cluster.getOzoneManager().getMetadataManager()
+      List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
+          = cluster.getOzoneManager().getMetadataManager().getDeletedTable()
+          .getRangeKVs(null, 100,
+              cluster.getOzoneManager().getMetadataManager()
               .getOzoneKey(volumeName, bucketName, keyName));
 
-      Assert.assertNotNull(repeatedOmKeyInfo);
+      Assert.assertTrue(rangeKVs.size() > 0);
       Assert.assertEquals(expectedCount,
-          repeatedOmKeyInfo.getOmKeyInfoList().size());
+          rangeKVs.get(0).getValue().getOmKeyInfoList().size());
     } else {
       // If expectedCount is greater than 1 means versioning enabled,
       // so delete table should be empty.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 5e0396a6a9..c21eb8359e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.request.key;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -35,7 +36,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
-import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator;
 import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase;
@@ -195,7 +195,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       // creation and key commit, old versions will be just overwritten and
       // not kept. Bucket versioning will be effective from the first key
       // creation after the knob turned on.
-      RepeatedOmKeyInfo oldKeyVersionsToDelete = null;
+      Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
       OmKeyInfo keyToDelete =
           omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
 
@@ -234,11 +234,16 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       } else if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) {
         // Subtract the size of blocks to be overwritten.
         correctedSpace -= keyToDelete.getReplicatedSize();
-        oldKeyVersionsToDelete = getOldVersionsToCleanUp(dbOzoneKey,
-            keyToDelete, omMetadataManager,
-            trxnLogIndex, ozoneManager.isRatisEnabled());
+        RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp(
+            keyToDelete, trxnLogIndex, ozoneManager.isRatisEnabled());
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
             correctedSpace);
+        String delKeyName = omMetadataManager.getOzoneDeletePathKey(
+            keyToDelete.getObjectID(), dbOzoneKey);
+        if (null == oldKeyVersionsToDeleteMap) {
+          oldKeyVersionsToDeleteMap = new HashMap<>();
+        }
+        oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
       } else {
         checkBucketQuotaInNamespace(omBucketInfo, 1L);
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
@@ -251,11 +256,14 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
           omKeyInfo);
       if (pseudoKeyInfo != null) {
-        if (oldKeyVersionsToDelete != null) {
-          oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
-        } else {
-          oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
+        long pseudoObjId = ozoneManager.getObjectIdFromTxId(trxnLogIndex);
+        String delKeyName = omMetadataManager.getOzoneDeletePathKey(
+            pseudoObjId, dbOzoneKey);
+        if (null == oldKeyVersionsToDeleteMap) {
+          oldKeyVersionsToDeleteMap = new HashMap<>();
         }
+        oldKeyVersionsToDeleteMap.put(delKeyName,
+            new RepeatedOmKeyInfo(pseudoKeyInfo));
       }
 
       // Add to cache of open key table and key table.
@@ -267,16 +275,11 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       omMetadataManager.getKeyTable(getBucketLayout()).addCacheEntry(
           dbOzoneKey, omKeyInfo, trxnLogIndex);
 
-      if (oldKeyVersionsToDelete != null) {
-        OMFileRequest.addDeletedTableCacheEntry(omMetadataManager, dbOzoneKey,
-            oldKeyVersionsToDelete, trxnLogIndex);
-      }
-
       omBucketInfo.incrUsedBytes(correctedSpace);
 
       omClientResponse = new OMKeyCommitResponse(omResponse.build(),
           omKeyInfo, dbOzoneKey, dbOpenKey, omBucketInfo.copyObject(),
-          oldKeyVersionsToDelete, isHSync);
+          oldKeyVersionsToDeleteMap, isHSync);
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index f0804a9131..0de87732f5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
+import java.util.HashMap;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
@@ -172,7 +173,7 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
       // creation and key commit, old versions will be just overwritten and
       // not kept. Bucket versioning will be effective from the first key
       // creation after the knob turned on.
-      RepeatedOmKeyInfo oldKeyVersionsToDelete = null;
+      Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
       OmKeyInfo keyToDelete =
           omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
 
@@ -187,11 +188,18 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
       } else if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) {
         // Subtract the size of blocks to be overwritten.
         correctedSpace -= keyToDelete.getReplicatedSize();
-        oldKeyVersionsToDelete = getOldVersionsToCleanUp(dbFileKey,
-            keyToDelete, omMetadataManager,
-            trxnLogIndex, ozoneManager.isRatisEnabled());
+        RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp(
+            keyToDelete, trxnLogIndex, ozoneManager.isRatisEnabled());
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
             correctedSpace);
+        String delKeyName = omMetadataManager
+            .getOzoneKey(volumeName, bucketName, fileName);
+        delKeyName = omMetadataManager.getOzoneDeletePathKey(
+            keyToDelete.getObjectID(), delKeyName);
+        if (null == oldKeyVersionsToDeleteMap) {
+          oldKeyVersionsToDeleteMap = new HashMap<>();
+        }
+        oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
       } else {
         checkBucketQuotaInNamespace(omBucketInfo, 1L);
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
@@ -204,11 +212,16 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
       OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
           omKeyInfo);
       if (pseudoKeyInfo != null) {
-        if (oldKeyVersionsToDelete != null) {
-          oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
-        } else {
-          oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
+        String delKeyName = omMetadataManager
+            .getOzoneKey(volumeName, bucketName, fileName);
+        long pseudoObjId = ozoneManager.getObjectIdFromTxId(trxnLogIndex);
+        delKeyName = omMetadataManager.getOzoneDeletePathKey(
+            pseudoObjId, delKeyName);
+        if (null == oldKeyVersionsToDeleteMap) {
+          oldKeyVersionsToDeleteMap = new HashMap<>();
         }
+        oldKeyVersionsToDeleteMap.put(delKeyName,
+            new RepeatedOmKeyInfo(pseudoKeyInfo));
       }
 
       // Add to cache of open key table and key table.
@@ -220,16 +233,11 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
       OMFileRequest.addFileTableCacheEntry(omMetadataManager, dbFileKey,
               omKeyInfo, fileName, trxnLogIndex);
 
-      if (oldKeyVersionsToDelete != null) {
-        OMFileRequest.addDeletedTableCacheEntry(omMetadataManager, dbFileKey,
-                oldKeyVersionsToDelete, trxnLogIndex);
-      }
-
       omBucketInfo.incrUsedBytes(correctedSpace);
 
       omClientResponse = new OMKeyCommitResponseWithFSO(omResponse.build(),
               omKeyInfo, dbFileKey, dbOpenFileKey, omBucketInfo.copyObject(),
-              oldKeyVersionsToDelete, volumeId, isHSync);
+          oldKeyVersionsToDeleteMap, volumeId, isHSync);
 
       result = Result.SUCCESS;
     } catch (IOException ex) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 8febf9fcf5..43e4bb697d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -805,16 +805,9 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
    * @throws IOException
    */
   protected RepeatedOmKeyInfo getOldVersionsToCleanUp(
-      @Nonnull String dbOzoneKey, @Nonnull OmKeyInfo keyToDelete,
-      OMMetadataManager omMetadataManager, long trxnLogIndex,
+      @Nonnull OmKeyInfo keyToDelete, long trxnLogIndex,
       boolean isRatisEnabled) throws IOException {
-
-    // Past keys that was deleted but still in deleted table,
-    // waiting for deletion service.
-    RepeatedOmKeyInfo keysToDelete =
-        omMetadataManager.getDeletedTable().get(dbOzoneKey);
-
-    return OmUtils.prepareKeyForDelete(keyToDelete, keysToDelete,
+    return OmUtils.prepareKeyForDelete(keyToDelete, null,
           trxnLogIndex, isRatisEnabled);
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index 1fcf3887ca..079b5e85ec 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -225,9 +225,8 @@ public class S3MultipartUploadCompleteRequest extends 
OMKeyRequest {
         long usedBytesDiff = 0;
         boolean isNamespaceUpdate = false;
         if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) {
-          oldKeyVersionsToDelete = getOldVersionsToCleanUp(dbOzoneKey,
-              keyToDelete, omMetadataManager,
-              trxnLogIndex, ozoneManager.isRatisEnabled());
+          oldKeyVersionsToDelete = getOldVersionsToCleanUp(
+              keyToDelete, trxnLogIndex, ozoneManager.isRatisEnabled());
           usedBytesDiff -= keyToDelete.getReplicatedSize();
         } else {
           checkBucketQuotaInNamespace(omBucketInfo, 1L);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
index 0fecb2242d..276baca4de 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/AbstractOMKeyDeleteResponse.java
@@ -98,13 +98,13 @@ public abstract class AbstractOMKeyDeleteResponse extends 
OmKeyResponse {
       // if RepeatedOMKeyInfo structure is null, we create a new instance,
       // if it is not null, then we simply add to the list and store this
       // instance in deletedTable.
-      RepeatedOmKeyInfo repeatedOmKeyInfo =
-          omMetadataManager.getDeletedTable().get(keyName);
-      repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-          omKeyInfo, repeatedOmKeyInfo, omKeyInfo.getUpdateID(),
+      RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+          omKeyInfo, null, omKeyInfo.getUpdateID(),
           isRatisEnabled);
+      String delKeyName = omMetadataManager.getOzoneDeletePathKey(
+          omKeyInfo.getObjectID(), keyName);
       omMetadataManager.getDeletedTable().putWithBatch(
-          batchOperation, keyName, repeatedOmKeyInfo);
+          batchOperation, delKeyName, repeatedOmKeyInfo);
     }
   }
 
@@ -115,7 +115,7 @@ public abstract class AbstractOMKeyDeleteResponse extends 
OmKeyResponse {
    *  file table (which is in prefix format) and adds the fullKey
    *  into the deletedTable
    * @param keyName     (format: objectId/key)
-   * @param fullKeyName (format: vol/buck/key)
+   * @param deleteKeyName (format: vol/buck/key/objectId)
    * @param omKeyInfo
    * @throws IOException
    */
@@ -123,7 +123,7 @@ public abstract class AbstractOMKeyDeleteResponse extends 
OmKeyResponse {
       OMMetadataManager omMetadataManager,
       BatchOperation batchOperation,
       Table<String, ?> fromTable,
-      String keyName, String fullKeyName,
+      String keyName, String deleteKeyName,
       OmKeyInfo omKeyInfo) throws IOException {
 
     // For OmResponse with failure, this should do nothing. This method is
@@ -141,13 +141,11 @@ public abstract class AbstractOMKeyDeleteResponse extends 
OmKeyResponse {
       // if RepeatedOMKeyInfo structure is null, we create a new instance,
       // if it is not null, then we simply add to the list and store this
       // instance in deletedTable.
-      RepeatedOmKeyInfo repeatedOmKeyInfo =
-          omMetadataManager.getDeletedTable().get(fullKeyName);
-      repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-          omKeyInfo, repeatedOmKeyInfo, omKeyInfo.getUpdateID(),
+      RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+          omKeyInfo, null, omKeyInfo.getUpdateID(),
           isRatisEnabled);
       omMetadataManager.getDeletedTable().putWithBatch(
-          batchOperation, fullKeyName, repeatedOmKeyInfo);
+          batchOperation, deleteKeyName, repeatedOmKeyInfo);
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
index eda6c8fb66..06450638be 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
@@ -135,6 +135,8 @@ public class OMDirectoriesPurgeResponseWithFSO extends 
OmKeyResponse {
         String deletedKey = omMetadataManager
             .getOzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
                 keyInfo.getKeyName());
+        deletedKey = omMetadataManager.getOzoneDeletePathKey(
+            keyInfo.getObjectID(), deletedKey);
 
         // TODO: [SNAPSHOT] Acquire deletedTable write table lock
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index a861863665..bc41b6aa0c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.response.key;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -47,20 +48,22 @@ public class OMKeyCommitResponse extends OmKeyResponse {
   private String ozoneKeyName;
   private String openKeyName;
   private OmBucketInfo omBucketInfo;
-  private RepeatedOmKeyInfo keysToDelete;
+  private Map<String, RepeatedOmKeyInfo> keyToDeleteMap;
 
   private boolean isHSync;
 
-  public OMKeyCommitResponse(@Nonnull OMResponse omResponse,
+  public OMKeyCommitResponse(
+      @Nonnull OMResponse omResponse,
       @Nonnull OmKeyInfo omKeyInfo, String ozoneKeyName, String openKeyName,
-      @Nonnull OmBucketInfo omBucketInfo, RepeatedOmKeyInfo keysToDelete,
-                             boolean isHSync) {
+      @Nonnull OmBucketInfo omBucketInfo,
+      Map<String, RepeatedOmKeyInfo> keyToDeleteMap,
+      boolean isHSync) {
     super(omResponse, omBucketInfo.getBucketLayout());
     this.omKeyInfo = omKeyInfo;
     this.ozoneKeyName = ozoneKeyName;
     this.openKeyName = openKeyName;
     this.omBucketInfo = omBucketInfo;
-    this.keysToDelete = keysToDelete;
+    this.keyToDeleteMap = keyToDeleteMap;
     this.isHSync = isHSync;
   }
 
@@ -112,15 +115,18 @@ public class OMKeyCommitResponse extends OmKeyResponse {
   }
 
   @VisibleForTesting
-  public RepeatedOmKeyInfo getKeysToDelete() {
-    return keysToDelete;
+  public Map<String, RepeatedOmKeyInfo> getKeysToDelete() {
+    return keyToDeleteMap;
   }
 
   protected void updateDeletedTable(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
-    if (this.keysToDelete != null) {
-      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
-              ozoneKeyName, keysToDelete);
+    if (this.keyToDeleteMap != null) {
+      for (Map.Entry<String, RepeatedOmKeyInfo> entry : 
+          keyToDeleteMap.entrySet()) {
+        omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+            entry.getKey(), entry.getValue());
+      }
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
index 0551943683..ff267f7c07 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import java.util.Map;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -47,14 +48,15 @@ public class OMKeyCommitResponseWithFSO extends 
OMKeyCommitResponse {
   private long volumeId;
 
   @SuppressWarnings("parameternumber")
-  public OMKeyCommitResponseWithFSO(@Nonnull OMResponse omResponse,
-                               @Nonnull OmKeyInfo omKeyInfo,
-                               String ozoneKeyName, String openKeyName,
-                               @Nonnull OmBucketInfo omBucketInfo,
-                               RepeatedOmKeyInfo deleteKeys, long volumeId,
-                                    boolean isHSync) {
+  public OMKeyCommitResponseWithFSO(
+      @Nonnull OMResponse omResponse,
+      @Nonnull OmKeyInfo omKeyInfo,
+      String ozoneKeyName, String openKeyName,
+      @Nonnull OmBucketInfo omBucketInfo,
+      Map<String, RepeatedOmKeyInfo> deleteKeyMap, long volumeId,
+      boolean isHSync) {
     super(omResponse, omKeyInfo, ozoneKeyName, openKeyName,
-            omBucketInfo, deleteKeys, isHSync);
+            omBucketInfo, deleteKeyMap, isHSync);
     this.volumeId = volumeId;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
index e18bcaae1f..c2773429f4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponseWithFSO.java
@@ -98,6 +98,8 @@ public class OMKeyDeleteResponseWithFSO extends 
OMKeyDeleteResponse {
       String deletedKey = omMetadataManager
           .getOzoneKey(omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(),
               omKeyInfo.getKeyName());
+      deletedKey = omMetadataManager.getOzoneDeletePathKey(
+          omKeyInfo.getObjectID(), deletedKey);
       addDeletionToBatch(omMetadataManager, batchOperation, keyTable,
           ozoneDbKey, deletedKey, omKeyInfo);
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
index 612fa36618..a5f5787030 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponseWithFSO.java
@@ -83,6 +83,8 @@ public class OMKeysDeleteResponseWithFSO extends 
OMKeysDeleteResponse {
       String deletedKey = omMetadataManager
           .getOzoneKey(omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(),
               omKeyInfo.getKeyName());
+      deletedKey = omMetadataManager.getOzoneDeletePathKey(
+          omKeyInfo.getObjectID(), deletedKey);
       addDeletionToBatch(omMetadataManager, batchOperation, keyTable,
           ozoneDbKey, deletedKey, omKeyInfo);
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMTrashRecoverResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMTrashRecoverResponse.java
index cf94fbdd8a..41a41fe642 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMTrashRecoverResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMTrashRecoverResponse.java
@@ -55,8 +55,10 @@ public class OMTrashRecoverResponse extends OmKeyResponse {
     String trashKey = omMetadataManager
         .getOzoneKey(omKeyInfo.getVolumeName(),
             omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
+    String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+        omKeyInfo.getObjectID(), trashKey);
     RepeatedOmKeyInfo repeatedOmKeyInfo = omMetadataManager
-        .getDeletedTable().get(trashKey);
+        .getDeletedTable().get(deleteKey);
     if (repeatedOmKeyInfo.getOmKeyInfoList().contains(omKeyInfo)) {
       omMetadataManager.getDeletedTable()
               .deleteWithBatch(batchOperation, omKeyInfo.getKeyName());
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index db84671f5d..ea687d1fed 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -840,11 +840,8 @@ public final class OMRequestTestUtils {
     // Delete key from KeyTable and put in DeletedKeyTable
     omMetadataManager.getKeyTable(getDefaultBucketLayout()).delete(ozoneKey);
 
-    RepeatedOmKeyInfo repeatedOmKeyInfo =
-        omMetadataManager.getDeletedTable().get(ozoneKey);
-
-    repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(omKeyInfo,
-        repeatedOmKeyInfo, trxnLogIndex, true);
+    RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+        omKeyInfo, null, trxnLogIndex, true);
 
     omMetadataManager.getDeletedTable().put(ozoneKey, repeatedOmKeyInfo);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index c38d78056b..d741deb882 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.ozone.om.request.key;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -29,6 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse;
 import org.apache.hadoop.util.Time;
@@ -248,14 +250,15 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
         omClientResponse.getOMResponse().getStatus());
 
-    List<OmKeyInfo> toDeleteKeyList = ((OMKeyCommitResponse) omClientResponse).
-        getKeysToDelete().cloneOmKeyInfoList();
+    Map<String, RepeatedOmKeyInfo> toDeleteKeyList
+        = ((OMKeyCommitResponse) omClientResponse).getKeysToDelete();
 
     // This is the first time to commit key, only the allocated but uncommitted
     // blocks should be deleted.
     Assert.assertEquals(1, toDeleteKeyList.size());
-    Assert.assertEquals(2, toDeleteKeyList.get(0).
-        getKeyLocationVersions().get(0).getLocationList().size());
+    Assert.assertEquals(2, toDeleteKeyList.values().stream().findFirst().get()
+        .cloneOmKeyInfoList().get(0).getKeyLocationVersions().get(0)
+        .getLocationList().size());
 
     // Entry should be deleted from openKey Table.
     omKeyInfo =
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
index 5cc4e54d98..7316d4c50d 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.jetbrains.annotations.NotNull;
@@ -120,10 +124,14 @@ public class TestOMKeyCommitResponse extends 
TestOMKeyResponse {
     Assert.assertNotNull(keysToDelete);
     testAddToDBBatch();
 
-    RepeatedOmKeyInfo keysInDeleteTable =
-            omMetadataManager.getDeletedTable().get(getOzoneKey());
-    Assert.assertNotNull(keysInDeleteTable);
-    Assert.assertEquals(1, keysInDeleteTable.getOmKeyInfoList().size());
+    String deletedKey = omMetadataManager.getOzoneKey(volumeName,
+        omBucketInfo.getBucketName(), keyName);
+    List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
+        = omMetadataManager.getDeletedTable().getRangeKVs(
+        null, 100, deletedKey);
+    Assert.assertTrue(rangeKVs.size() > 0);
+    Assert.assertEquals(1,
+        rangeKVs.get(0).getValue().getOmKeyInfoList().size());
 
   }
 
@@ -146,7 +154,13 @@ public class TestOMKeyCommitResponse extends 
TestOMKeyResponse {
           String ozoneKey, RepeatedOmKeyInfo deleteKeys, Boolean isHSync)
           throws IOException {
     Assert.assertNotNull(omBucketInfo);
+    Map<String, RepeatedOmKeyInfo> deleteKeyMap = new HashMap<>();
+    if (null != deleteKeys) {
+      deleteKeys.getOmKeyInfoList().stream().forEach(e -> deleteKeyMap.put(
+          omMetadataManager.getOzoneDeletePathKey(e.getObjectID(), ozoneKey),
+          new RepeatedOmKeyInfo(e)));
+    }
     return new OMKeyCommitResponse(omResponse, omKeyInfo, ozoneKey, openKey,
-            omBucketInfo, deleteKeys, isHSync);
+            omBucketInfo, deleteKeyMap, isHSync);
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
index 5bc9d340d6..aca2a6398d 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.response.key;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -44,8 +46,16 @@ public class TestOMKeyCommitResponseWithFSO extends 
TestOMKeyCommitResponse {
           throws IOException {
     Assert.assertNotNull(omBucketInfo);
     long volumeId = omMetadataManager.getVolumeId(omKeyInfo.getVolumeName());
+    Map<String, RepeatedOmKeyInfo> deleteKeyMap = new HashMap<>();
+    if (null != keysToDelete) {
+      String deleteKey = omMetadataManager.getOzoneKey(volumeName,
+          bucketName, keyName);
+      deleteKeys.getOmKeyInfoList().stream().forEach(e -> deleteKeyMap.put(
+          omMetadataManager.getOzoneDeletePathKey(e.getObjectID(), deleteKey),
+          new RepeatedOmKeyInfo(e)));
+    }
     return new OMKeyCommitResponseWithFSO(omResponse, omKeyInfo, ozoneKey,
-        openKey, omBucketInfo, deleteKeys, volumeId, isHSync);
+        openKey, omBucketInfo, deleteKeyMap, volumeId, isHSync);
   }
 
   @NotNull
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
index adb9151b23..d09527568a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyDeleteResponse.java
@@ -22,9 +22,11 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -122,12 +124,15 @@ public class TestOMKeyDeleteResponse extends 
TestOMKeyResponse {
 
     Assert.assertFalse(
         omMetadataManager.getKeyTable(getBucketLayout()).isExist(ozoneKey));
-
+    
     String deletedKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
         keyName);
+    List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
+        = omMetadataManager.getDeletedTable().getRangeKVs(
+        null, 100, deletedKey);
 
     // Key has blocks, it should not be in deletedKeyTable.
-    Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(deletedKey));
+    Assert.assertTrue(rangeKVs.size() > 0);
   }
 
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
index 510ac2afc6..00ed696029 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMOpenKeysDeleteResponse.java
@@ -109,19 +109,26 @@ public class TestOMOpenKeysDeleteResponse extends 
TestOMKeyResponse {
 
     createAndCommitResponse(keysToDelete, Status.OK);
 
-    for (String key: keysToDelete.keySet()) {
+    for (Map.Entry<String, OmKeyInfo> entry: keysToDelete.entrySet()) {
       // These keys should have been moved from the open key table to the
       // delete table.
       Assert.assertFalse(
-          omMetadataManager.getOpenKeyTable(getBucketLayout()).isExist(key));
-      Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(key));
+          omMetadataManager.getOpenKeyTable(getBucketLayout()).isExist(
+              entry.getKey()));
+      String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+          entry.getValue().getObjectID(), entry.getKey());
+      
Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(deleteKey));
     }
 
-    for (String key: keysToKeep.keySet()) {
+    for (Map.Entry<String, OmKeyInfo> entry: keysToKeep.entrySet()) {
       // These keys should not have been moved out of the open key table.
       Assert.assertTrue(
-          omMetadataManager.getOpenKeyTable(getBucketLayout()).isExist(key));
-      Assert.assertFalse(omMetadataManager.getDeletedTable().isExist(key));
+          omMetadataManager.getOpenKeyTable(getBucketLayout()).isExist(
+              entry.getKey()));
+      String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+          entry.getValue().getObjectID(), entry.getKey());
+      Assert.assertFalse(omMetadataManager.getDeletedTable()
+          .isExist(deleteKey));
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index ca67653459..64180ca7e1 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmTestManagers;
@@ -38,6 +39,7 @@ import 
org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.ratis.util.ExitUtils;
 import org.junit.BeforeClass;
@@ -430,8 +432,14 @@ public class TestKeyDeletingService {
     // key1 belongs to snapshot, so it should not be deleted when
     // KeyDeletingService runs. But key2 can be reclaimed as it doesn't
     // belong to any snapshot scope.
-    Assert.assertTrue(metadataManager.getDeletedTable().isExist(ozoneKey1));
-    Assert.assertFalse(metadataManager.getDeletedTable().isExist(ozoneKey2));
+    List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> rangeKVs
+        = metadataManager.getDeletedTable().getRangeKVs(
+        null, 100, ozoneKey1);
+    Assert.assertTrue(rangeKVs.size() > 0);
+    rangeKVs
+        = metadataManager.getDeletedTable().getRangeKVs(
+        null, 100, ozoneKey2);
+    Assert.assertTrue(rangeKVs.size() == 0);
   }
 
   private void createVolumeAndBucket(KeyManager keyManager, String volumeName,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to