This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 56062d39061 HDDS-13764. KeyDeletingService and 
DirectoryDeletingService should reduce snapshot bucket quota usage (#9122)
56062d39061 is described below

commit 56062d39061d204d1176a4c3e6158b1eea6e9ede
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Oct 9 14:51:19 2025 -0400

    HDDS-13764. KeyDeletingService and DirectoryDeletingService should reduce 
snapshot bucket quota usage (#9122)
---
 .../apache/hadoop/ozone/util/ProtobufUtils.java    |   4 +
 .../org/apache/hadoop/ozone/om/TestKeyPurging.java |   2 +-
 .../src/main/proto/OmClientProtocol.proto          |   7 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  23 ++-
 .../hadoop/ozone/om/PendingKeysDeletion.java       |  89 +++++++--
 .../key/OMDirectoriesPurgeRequestWithFSO.java      |  20 ++-
 .../ozone/om/request/key/OMKeyPurgeRequest.java    |  65 ++++++-
 .../ozone/om/response/key/OMKeyPurgeResponse.java  |  11 +-
 .../ozone/om/service/KeyDeletingService.java       | 200 +++++++++++++++------
 .../TestOMDirectoriesPurgeRequestAndResponse.java  |  27 ++-
 .../key/TestOMKeyPurgeRequestAndResponse.java      |   4 +-
 .../ozone/om/service/TestKeyDeletingService.java   |  99 ++++++----
 12 files changed, 427 insertions(+), 124 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
index 10c3669a94e..05d2b116d26 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
@@ -46,4 +46,8 @@ public static UUID fromProtobuf(HddsProtos.UUID proto) {
   public static int computeRepeatedStringSize(String value) {
     return CodedOutputStream.computeStringSizeNoTag(value);
   }
+
+  public static int computeLongSizeWithTag(int fieldNumber, long value) {
+    return CodedOutputStream.computeInt64Size(fieldNumber, value);
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index fa59754b67f..2a09ffc5ddc 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -127,7 +127,7 @@ public void testKeysPurgingByKeyDeletingService() throws 
Exception {
         () -> {
           try {
             return keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE)
-                .getKeyBlocksList().isEmpty();
+                .getPurgedKeys().isEmpty();
           } catch (IOException e) {
             return false;
           }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 2329da73c64..61a3c1d6792 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1417,6 +1417,13 @@ message PurgeKeysRequest {
     // previous snapshotID can also be null & this field would be absent in 
older requests.
     optional NullableUUID expectedPreviousSnapshotID = 4;
     repeated string renamedKeys = 5;
+    repeated BucketPurgeKeysSize bucketPurgeKeysSize = 6;
+}
+
+message BucketPurgeKeysSize {
+  optional BucketNameInfo bucketNameInfo = 1;
+  optional uint64 purgedBytes = 2;
+  optional uint64 purgedNamespace = 3;
 }
 
 message PurgeKeysResponse {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 8c50232355c..02949d5ee74 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -80,7 +80,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import jakarta.annotation.Nonnull;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
@@ -138,6 +138,7 @@
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
@@ -161,6 +162,7 @@
 import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
 import org.apache.hadoop.ozone.om.service.CompactionService;
 import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
@@ -741,9 +743,8 @@ public PendingKeysDeletion getPendingDeletionKeys(
       String volume, String bucket, String startKey,
       CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter,
       int count) throws IOException {
-    List<BlockGroup> keyBlocksList = Lists.newArrayList();
+    Map<String, PurgedKey> purgedKeys = Maps.newHashMap();
     Map<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
-    Map<String, Long> keyBlockReplicatedSize = new HashMap<>();
     int notReclaimableKeyCount = 0;
 
     // Bucket prefix would be empty if volume is empty i.e. either null or "".
@@ -762,9 +763,11 @@ public PendingKeysDeletion getPendingDeletionKeys(
         KeyValue<String, RepeatedOmKeyInfo> kv = delKeyIter.next();
         if (kv != null) {
           RepeatedOmKeyInfo notReclaimableKeyInfo = new 
RepeatedOmKeyInfo(kv.getValue().getBucketId());
-          List<BlockGroup> blockGroupList = Lists.newArrayList();
+          Map<String, PurgedKey> reclaimableKeys = Maps.newHashMap();
           // Multiple keys with the same path can be queued in one DB entry
           RepeatedOmKeyInfo infoList = kv.getValue();
+          long bucketId = infoList.getBucketId();
+          int reclaimableKeyCount = 0;
           for (OmKeyInfo info : infoList.getOmKeyInfoList()) {
 
             // Skip the key if the filter doesn't allow the file to be deleted.
@@ -772,10 +775,12 @@ public PendingKeysDeletion getPendingDeletionKeys(
               List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
                   .flatMap(versionLocations -> 
versionLocations.getLocationList().stream()
                       .map(b -> new BlockID(b.getContainerID(), 
b.getLocalID()))).collect(Collectors.toList());
-              BlockGroup keyBlocks = 
BlockGroup.newBuilder().setKeyName(kv.getKey())
+              String blockGroupName = kv.getKey() + "/" + 
reclaimableKeyCount++;
+              BlockGroup keyBlocks = 
BlockGroup.newBuilder().setKeyName(blockGroupName)
                   .addAllBlockIDs(blockIDS).build();
-              keyBlockReplicatedSize.put(keyBlocks.getGroupID(), 
info.getReplicatedSize());
-              blockGroupList.add(keyBlocks);
+              reclaimableKeys.put(blockGroupName,
+                  new PurgedKey(info.getVolumeName(), info.getBucketName(), 
bucketId,
+                  keyBlocks, kv.getKey(), OMKeyRequest.sumBlockLengths(info), 
info.isDeletedKeyCommitted()));
               currentCount++;
             } else {
               notReclaimableKeyInfo.addOmKeyInfo(info);
@@ -789,12 +794,12 @@ public PendingKeysDeletion getPendingDeletionKeys(
               notReclaimableKeyInfoList.size() != 
infoList.getOmKeyInfoList().size()) {
             keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
           }
-          keyBlocksList.addAll(blockGroupList);
+          purgedKeys.putAll(reclaimableKeys);
           notReclaimableKeyCount += notReclaimableKeyInfoList.size();
         }
       }
     }
-    return new PendingKeysDeletion(keyBlocksList, keysToModify, 
keyBlockReplicatedSize, notReclaimableKeyCount);
+    return new PendingKeysDeletion(purgedKeys, keysToModify, 
notReclaimableKeyCount);
   }
 
   private <V, R> List<KeyValue<String, R>> getTableEntries(String startKey,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
index ab8c4dda167..1071705b5eb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
@@ -17,7 +17,6 @@
 
 package org.apache.hadoop.ozone.om;
 
-import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
@@ -35,18 +34,15 @@
  */
 public class PendingKeysDeletion {
 
-  private Map<String, RepeatedOmKeyInfo> keysToModify;
-  private List<BlockGroup> keyBlocksList;
-  private Map<String, Long> keyBlockReplicatedSize;
+  private final Map<String, RepeatedOmKeyInfo> keysToModify;
+  private final Map<String, PurgedKey> purgedKeys;
   private int notReclaimableKeyCount;
 
-  public PendingKeysDeletion(List<BlockGroup> keyBlocksList,
-       Map<String, RepeatedOmKeyInfo> keysToModify,
-       Map<String, Long> keyBlockReplicatedSize,
-       int notReclaimableKeyCount) {
+  public PendingKeysDeletion(Map<String, PurgedKey> purgedKeys,
+      Map<String, RepeatedOmKeyInfo> keysToModify,
+      int notReclaimableKeyCount) {
     this.keysToModify = keysToModify;
-    this.keyBlocksList = keyBlocksList;
-    this.keyBlockReplicatedSize = keyBlockReplicatedSize;
+    this.purgedKeys = purgedKeys;
     this.notReclaimableKeyCount = notReclaimableKeyCount;
   }
 
@@ -54,12 +50,77 @@ public Map<String, RepeatedOmKeyInfo> getKeysToModify() {
     return keysToModify;
   }
 
-  public List<BlockGroup> getKeyBlocksList() {
-    return keyBlocksList;
+  public Map<String, PurgedKey> getPurgedKeys() {
+    return purgedKeys;
   }
 
-  public Map<String, Long> getKeyBlockReplicatedSize() {
-    return keyBlockReplicatedSize;
+  /**
+   * Represents metadata for a key that has been purged.
+   *
+   * This class holds information about a specific purged key,
+   * including its volume, bucket, associated block group,
+   * and the amount of data purged in bytes.
+   */
+  public static class PurgedKey {
+    private final String volume;
+    private final String bucket;
+    private final long bucketId;
+    private final BlockGroup blockGroup;
+    private final long purgedBytes;
+    private final boolean isCommittedKey;
+    private final String deleteKeyName;
+
+    public PurgedKey(String volume, String bucket, long bucketId, BlockGroup 
group, String deleteKeyName,
+        long purgedBytes, boolean isCommittedKey) {
+      this.volume = volume;
+      this.bucket = bucket;
+      this.bucketId = bucketId;
+      this.blockGroup = group;
+      this.purgedBytes = purgedBytes;
+      this.isCommittedKey = isCommittedKey;
+      this.deleteKeyName = deleteKeyName;
+    }
+
+    public BlockGroup getBlockGroup() {
+      return blockGroup;
+    }
+
+    public long getPurgedBytes() {
+      return purgedBytes;
+    }
+
+    public String getVolume() {
+      return volume;
+    }
+
+    public String getBucket() {
+      return bucket;
+    }
+
+    public long getBucketId() {
+      return bucketId;
+    }
+
+    public boolean isCommittedKey() {
+      return isCommittedKey;
+    }
+
+    public String getDeleteKeyName() {
+      return deleteKeyName;
+    }
+
+    @Override
+    public String toString() {
+      return "PurgedKey{" +
+          "blockGroup=" + blockGroup +
+          ", volume='" + volume + '\'' +
+          ", bucket='" + bucket + '\'' +
+          ", bucketId=" + bucketId +
+          ", purgedBytes=" + purgedBytes +
+          ", isCommittedKey=" + isCommittedKey +
+          ", deleteKeyName='" + deleteKeyName + '\'' +
+          '}';
+    }
   }
 
   public int getNotReclaimableKeyCount() {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index 991b16480b4..adbbf58a98f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -30,6 +30,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.tuple.Pair;
@@ -41,6 +42,7 @@
 import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.OMSystemAction;
 import org.apache.hadoop.ozone.om.DeletingServiceMetrics;
+import org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -55,6 +57,7 @@
 import 
org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeDirectoriesRequest;
@@ -132,9 +135,12 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
     }
     try {
       int numSubDirMoved = 0, numSubFilesMoved = 0, numDirsDeleted = 0;
+      Map<VolumeBucketId, BucketNameInfo> volumeBucketIdMap = 
purgeDirsRequest.getBucketNameInfosList().stream()
+          .collect(Collectors.toMap(bucketNameInfo ->
+                  new VolumeBucketId(bucketNameInfo.getVolumeId(), 
bucketNameInfo.getBucketId()),
+              Function.identity()));
       for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
-        for (OzoneManagerProtocolProtos.KeyInfo key :
-            path.getMarkDeletedSubDirsList()) {
+        for (OzoneManagerProtocolProtos.KeyInfo key : 
path.getMarkDeletedSubDirsList()) {
           ProcessedKeyInfo processed = processDeleteKey(key, path, 
omMetadataManager);
           subDirNames.add(processed.deleteKey);
 
@@ -154,8 +160,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager 
ozoneManager, Execut
           }
         }
 
-        for (OzoneManagerProtocolProtos.KeyInfo key :
-            path.getDeletedSubFilesList()) {
+        for (OzoneManagerProtocolProtos.KeyInfo key : 
path.getDeletedSubFilesList()) {
           ProcessedKeyInfo processed = processDeleteKey(key, path, 
omMetadataManager);
           subFileNames.add(processed.deleteKey);
 
@@ -194,6 +199,13 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
         }
         if (path.hasDeletedDir()) {
           deletedDirNames.add(path.getDeletedDir());
+          BucketNameInfo bucketNameInfo = volumeBucketIdMap.get(new 
VolumeBucketId(path.getVolumeId(),
+              path.getBucketId()));
+          OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
+              bucketNameInfo.getVolumeName(), bucketNameInfo.getBucketName());
+          if (omBucketInfo != null && omBucketInfo.getObjectID() == 
path.getBucketId()) {
+            omBucketInfo.purgeSnapshotUsedNamespace(1);
+          }
           numDirsDeleted++;
         }
       }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index a4cc9fe2f7a..b543c3cfbf6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -18,10 +18,12 @@
 package org.apache.hadoop.ozone.om.request.key;
 
 import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.BUCKET_LOCK;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.validatePreviousSnapshotId;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,15 +35,19 @@
 import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.OMSystemAction;
 import org.apache.hadoop.ozone.om.DeletingServiceMetrics;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketPurgeKeysSize;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -161,9 +167,64 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
       }
       return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, e));
     }
+    try {
+      List<OmBucketInfo> bucketInfoList = 
updateBucketSize(purgeKeysRequest.getBucketPurgeKeysSizeList(),
+          omMetadataManager);
+      return new OMKeyPurgeResponse(omResponse.build(),
+          keysToBePurgedList, renamedKeysToBePurged, fromSnapshotInfo, 
keysToUpdateList, bucketInfoList);
+    } catch (OMException oe) {
+      
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.KEY_DELETION,
 null, oe));
+      return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, oe));
+    }
+  }
 
-    return new OMKeyPurgeResponse(omResponse.build(),
-        keysToBePurgedList, renamedKeysToBePurged, fromSnapshotInfo, 
keysToUpdateList);
+  private List<OmBucketInfo> updateBucketSize(List<BucketPurgeKeysSize> 
bucketPurgeKeysSizeList,
+      OMMetadataManager omMetadataManager) throws OMException {
+    Map<String, Map<String, List<BucketPurgeKeysSize>>> bucketPurgeKeysSizes = 
new HashMap<>();
+    List<String[]> bucketKeyList = new ArrayList<>();
+    for (BucketPurgeKeysSize bucketPurgeKey : bucketPurgeKeysSizeList) {
+      String volumeName = bucketPurgeKey.getBucketNameInfo().getVolumeName();
+      String bucketName = bucketPurgeKey.getBucketNameInfo().getBucketName();
+      bucketPurgeKeysSizes.computeIfAbsent(volumeName, k -> new HashMap<>())
+          .computeIfAbsent(bucketName, k -> {
+            bucketKeyList.add(new String[]{volumeName, bucketName});
+            return new ArrayList<>();
+          }).add(bucketPurgeKey);
+    }
+    
mergeOmLockDetails(omMetadataManager.getLock().acquireWriteLocks(BUCKET_LOCK, 
bucketKeyList));
+    boolean acquiredLock = getOmLockDetails().isLockAcquired();
+    if (!acquiredLock) {
+      throw new OMException("Failed to acquire bucket lock for purging keys.",
+          OMException.ResultCodes.KEY_DELETION_ERROR);
+    }
+    List<OmBucketInfo> bucketInfoList = new ArrayList<>();
+    try {
+      for (Map.Entry<String, Map<String, List<BucketPurgeKeysSize>>> volEntry 
: bucketPurgeKeysSizes.entrySet()) {
+        String volumeName = volEntry.getKey();
+        for (Map.Entry<String, List<BucketPurgeKeysSize>> bucketEntry : 
volEntry.getValue().entrySet()) {
+          String bucketName = bucketEntry.getKey();
+          OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager, 
volumeName, bucketName);
+          // Check null if bucket has been deleted.
+          if (omBucketInfo != null) {
+            boolean bucketUpdated = false;
+            for (BucketPurgeKeysSize bucketPurgeKeysSize : 
bucketEntry.getValue()) {
+              BucketNameInfo bucketNameInfo = 
bucketPurgeKeysSize.getBucketNameInfo();
+              if (bucketNameInfo.getBucketId() == omBucketInfo.getObjectID()) {
+                
omBucketInfo.purgeSnapshotUsedBytes(bucketPurgeKeysSize.getPurgedBytes());
+                
omBucketInfo.purgeSnapshotUsedNamespace(bucketPurgeKeysSize.getPurgedNamespace());
+                bucketUpdated = true;
+              }
+            }
+            if (bucketUpdated) {
+              bucketInfoList.add(omBucketInfo.copyObject());
+            }
+          }
+        }
+      }
+      return bucketInfoList;
+    } finally {
+      
mergeOmLockDetails(omMetadataManager.getLock().releaseWriteLocks(BUCKET_LOCK, 
bucketKeyList));
+    }
   }
 
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
index 0ceb5146282..38ce0a6266c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
@@ -23,6 +23,7 @@
 
 import jakarta.annotation.Nonnull;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -30,6 +31,7 @@
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
@@ -44,6 +46,7 @@
  */
 @CleanupTableInfo(cleanupTables = {DELETED_TABLE, SNAPSHOT_INFO_TABLE})
 public class OMKeyPurgeResponse extends OmKeyResponse {
+  private List<OmBucketInfo> bucketInfosToBeUpdated;
   private List<String> purgeKeyList;
   private List<String> renamedList;
   private SnapshotInfo fromSnapshot;
@@ -53,12 +56,14 @@ public OMKeyPurgeResponse(@Nonnull OMResponse omResponse,
       @Nonnull List<String> keyList,
       @Nonnull List<String> renamedList,
       SnapshotInfo fromSnapshot,
-      List<SnapshotMoveKeyInfos> keysToUpdate) {
+      List<SnapshotMoveKeyInfos> keysToUpdate,
+      List<OmBucketInfo> bucketInfosToBeUpdated) {
     super(omResponse);
     this.purgeKeyList = keyList;
     this.renamedList = renamedList;
     this.fromSnapshot = fromSnapshot;
     this.keysToUpdateList = keysToUpdate;
+    this.bucketInfosToBeUpdated = bucketInfosToBeUpdated == null ? 
Collections.emptyList() : bucketInfosToBeUpdated;
   }
 
   /**
@@ -96,6 +101,10 @@ public void addToDBBatch(OMMetadataManager 
omMetadataManager,
       processKeys(batchOperation, omMetadataManager);
       processKeysToUpdate(batchOperation, omMetadataManager);
     }
+    for (OmBucketInfo bucketInfo : bucketInfosToBeUpdated) {
+      String bucketKey = 
omMetadataManager.getBucketKey(bucketInfo.getVolumeName(), 
bucketInfo.getBucketName());
+      omMetadataManager.getBucketTable().putWithBatch(batchOperation, 
bucketKey, bucketInfo);
+    }
   }
 
   private void processKeysToUpdate(BatchOperation batchOp,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index d7dba697b71..254184f8b82 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -19,6 +19,7 @@
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static 
org.apache.hadoop.ozone.util.ProtobufUtils.computeLongSizeWithTag;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -27,6 +28,7 @@
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -48,7 +50,6 @@
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.ClientVersion;
-import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -57,6 +58,7 @@
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.PendingKeysDeletion;
+import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
@@ -65,6 +67,8 @@
 import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
 import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableRenameEntryFilter;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketPurgeKeysSize;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.NullableUUID;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
@@ -131,10 +135,9 @@ public AtomicLong getDeletedKeyCount() {
     return deletedKeyCount;
   }
 
-  Pair<Pair<Integer, Long>, Boolean> processKeyDeletes(List<BlockGroup> 
keyBlocksList,
+  Pair<Pair<Integer, Long>, Boolean> processKeyDeletes(Map<String, PurgedKey> 
keyBlocksList,
       Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntries,
-      String snapTableKey, UUID expectedPreviousSnapshotId, Map<String, Long> 
keyBlockReplicatedSize)
-      throws IOException, InterruptedException {
+      String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException 
{
     long startTime = Time.monotonicNow();
     Pair<Pair<Integer, Long>, Boolean> purgeResult = Pair.of(Pair.of(0, 0L), 
false);
     if (LOG.isDebugEnabled()) {
@@ -146,16 +149,18 @@ Pair<Pair<Integer, Long>, Boolean> 
processKeyDeletes(List<BlockGroup> keyBlocksL
         logSize = keyBlocksList.size();
       }
       LOG.info("Send {} key(s) to SCM, first {} keys: {}",
-          keyBlocksList.size(), logSize, keyBlocksList.subList(0, logSize));
+          keyBlocksList.size(), logSize, 
keyBlocksList.entrySet().stream().limit(logSize)
+              .map(Map.Entry::getValue).collect(Collectors.toSet()));
     }
     List<DeleteBlockGroupResult> blockDeletionResults =
-        scmClient.deleteKeyBlocks(keyBlocksList);
+        scmClient.deleteKeyBlocks(keyBlocksList.values().stream()
+            .map(PurgedKey::getBlockGroup).collect(Collectors.toList()));
     LOG.info("{} BlockGroup deletion are acked by SCM in {} ms",
         keyBlocksList.size(), Time.monotonicNow() - startTime);
     if (blockDeletionResults != null) {
       long purgeStartTime = Time.monotonicNow();
-      purgeResult = submitPurgeKeysRequest(blockDeletionResults, keysToModify, 
renameEntries, snapTableKey,
-          expectedPreviousSnapshotId, ratisByteLimit, keyBlockReplicatedSize);
+      purgeResult = submitPurgeKeysRequest(blockDeletionResults, 
keyBlocksList, keysToModify, renameEntries,
+          snapTableKey, expectedPreviousSnapshotId, ratisByteLimit);
       int limit = 
getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK,
           OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
       LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. 
Limit per task is {}.",
@@ -165,6 +170,60 @@ Pair<Pair<Integer, Long>, Boolean> 
processKeyDeletes(List<BlockGroup> keyBlocksL
     return purgeResult;
   }
 
+  private static final class BucketPurgeSize {
+    private BucketNameInfo bucket;
+    private long purgedBytes;
+    private long purgedNamespace;
+
+    private BucketPurgeSize(String volume, String bucket, long bucketId) {
+      this.bucket = 
BucketNameInfo.newBuilder().setBucketId(bucketId).setVolumeName(volume)
+              .setBucketName(bucket).build();
+      this.purgedBytes = 0;
+      this.purgedNamespace = 0;
+    }
+
+    private BucketPurgeSize incrementPurgedBytes(long bytes) {
+      purgedBytes += bytes;
+      return this;
+    }
+
+    private BucketPurgeSize incrementPurgedNamespace(long namespace) {
+      purgedNamespace += namespace;
+      return this;
+    }
+
+    private BucketPurgeKeysSize toProtobuf() {
+      return BucketPurgeKeysSize.newBuilder()
+          .setBucketNameInfo(bucket)
+          .setPurgedBytes(purgedBytes)
+          .setPurgedNamespace(purgedNamespace)
+          .build();
+    }
+
+    private int getEstimatedSize() {
+      // Using -10 as the placeholder to get max size i.e. 10 bytes to store 
the long value in protobuf.
+      // Field number 2 in BucketPurgeKeysSize proto corresponds to 
purgedBytes.
+      return this.bucket.getSerializedSize() + computeLongSizeWithTag(2, -10)
+      // Field number 3 in BucketPurgeKeysSize proto corresponds to 
purgedNamespace.
+          + computeLongSizeWithTag(3, -10);
+    }
+  }
+
+  private int increaseBucketPurgeSize(Map<Long, BucketPurgeSize> 
bucketPurgeSizeMap, PurgedKey purgedKey) {
+    BucketPurgeSize bucketPurgeSize;
+    int estimatedSize = 0;
+    if (!bucketPurgeSizeMap.containsKey(purgedKey.getBucketId())) {
+      bucketPurgeSize = 
bucketPurgeSizeMap.computeIfAbsent(purgedKey.getBucketId(),
+          (bucketId) -> new BucketPurgeSize(purgedKey.getVolume(), 
purgedKey.getBucket(),
+              purgedKey.getBucketId()));
+      estimatedSize = bucketPurgeSize.getEstimatedSize();
+    } else {
+      bucketPurgeSize = bucketPurgeSizeMap.get(purgedKey.getBucketId());
+    }
+    
bucketPurgeSize.incrementPurgedBytes(purgedKey.getPurgedBytes()).incrementPurgedNamespace(1);
+    return estimatedSize;
+  }
+
   /**
    * Submits PurgeKeys request for the keys whose blocks have been deleted
    * by SCM.
@@ -174,14 +233,14 @@ Pair<Pair<Integer, Long>, Boolean> 
processKeyDeletes(List<BlockGroup> keyBlocksL
   @SuppressWarnings("checkstyle:MethodLength")
   private Pair<Pair<Integer, Long>, Boolean> submitPurgeKeysRequest(
       List<DeleteBlockGroupResult> results,
+      Map<String, PurgedKey> purgedKeys,
       Map<String, RepeatedOmKeyInfo> keysToModify,
       List<String> renameEntriesToBeDeleted,
       String snapTableKey,
       UUID expectedPreviousSnapshotId,
-      int ratisLimit,
-      Map<String, Long> keyBlockReplicatedSize) throws InterruptedException {
+      int ratisLimit) {
 
-    List<String> purgeKeys = new ArrayList<>();
+    Set<String> completePurgedKeys = new HashSet<>();
 
     // Put all keys to be purged in a list
     int deletedCount = 0;
@@ -191,38 +250,50 @@ private Pair<Pair<Integer, Long>, Boolean> 
submitPurgeKeysRequest(
 
     // Step 1: Process DeleteBlockGroupResults
     for (DeleteBlockGroupResult result : results) {
-      String deletedKey = result.getObjectKey();
-      if (result.isSuccess()) {
-        // Add key to PurgeKeys list.
-        if (keysToModify != null && !keysToModify.containsKey(deletedKey)) {
-          // Parse Volume and BucketName
-          purgeKeys.add(deletedKey);
-          if (LOG.isDebugEnabled()) {
+      String deletedKeyGroup = result.getObjectKey();
+      PurgedKey purgedKey = purgedKeys.get(deletedKeyGroup);
+      if (purgedKey != null) {
+        String deletedKeyName = purgedKey.getDeleteKeyName();
+        if (result.isSuccess()) {
+          // Add key to PurgeKeys list.
+          if (keysToModify == null || 
!keysToModify.containsKey(deletedKeyName)) {
+            completePurgedKeys.add(deletedKeyName);
+            LOG.debug("Key {} set to be purged from OM DB", deletedKeyName);
+          } else {
             LOG.debug("Key {} set to be updated in OM DB, Other versions " +
-                "of the key that are reclaimable are reclaimed.", deletedKey);
+                "of the key that are reclaimable are reclaimed.", 
deletedKeyName);
           }
-        } else if (keysToModify == null) {
-          purgeKeys.add(deletedKey);
+          deletedReplSize += purgedKey.getPurgedBytes();
+          deletedCount++;
+        } else {
+          // If the block deletion failed, then the deleted keys should also 
not be modified and
+          // any other version of the key should also not be purged.
+          failedDeletedKeys.add(deletedKeyName);
+          purgeSuccess = false;
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+            LOG.error("Failed Block Delete corresponding to Key {} with block 
result : {}.", deletedKeyName,
+                result.getBlockResultList());
+          } else {
+            LOG.error("Failed Block Delete corresponding to Key {}.", 
deletedKeyName);
           }
         }
-        if (keyBlockReplicatedSize != null) {
-          deletedReplSize += keyBlockReplicatedSize.getOrDefault(deletedKey, 
0L);
-        }
-        deletedCount++;
       } else {
-        // If the block deletion failed, then the deleted keys should also not 
be modified.
-        failedDeletedKeys.add(deletedKey);
-        purgeSuccess = false;
+        LOG.error("Key {} not found in the list of keys to be purged." +
+            " Skipping purge for this entry. Result of delete blocks : {}", 
deletedKeyGroup, result.isSuccess());
       }
     }
+    // Filter out the key even if one version of the key purge has failed. 
This is to prevent orphan blocks, and
+    // this needs to be retried.
+    completePurgedKeys = completePurgedKeys.stream()
+        .filter(i -> 
!failedDeletedKeys.contains(i)).collect(Collectors.toSet());
+    // Filter out any keys that have failed and sort the purge keys based on 
volume and bucket.
+    List<PurgedKey> purgedKeyList = purgedKeys.values().stream()
+        .filter(purgedKey -> 
!failedDeletedKeys.contains(purgedKey.getDeleteKeyName()))
+        .collect(Collectors.toList());
 
-    // Step 2: Prepare keysToUpdateList
     List<OzoneManagerProtocolProtos.SnapshotMoveKeyInfos> keysToUpdateList = 
new ArrayList<>();
     if (keysToModify != null) {
-      for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify :
-          keysToModify.entrySet()) {
+      for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify : 
keysToModify.entrySet()) {
         if (failedDeletedKeys.contains(keyToModify.getKey())) {
           continue;
         }
@@ -239,7 +310,7 @@ private Pair<Pair<Integer, Long>, Boolean> 
submitPurgeKeysRequest(
       }
     }
 
-    if (purgeKeys.isEmpty() && keysToUpdateList.isEmpty() &&
+    if (purgedKeyList.isEmpty() && keysToUpdateList.isEmpty() &&
         (renameEntriesToBeDeleted == null || 
renameEntriesToBeDeleted.isEmpty())) {
       return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess);
     }
@@ -249,20 +320,33 @@ private Pair<Pair<Integer, Long>, Boolean> 
submitPurgeKeysRequest(
     int currSize = requestBuilder.build().getSerializedSize();
     int baseSize = currSize;
 
-    while (purgeKeyIndex < purgeKeys.size() || updateIndex < 
keysToUpdateList.size() ||
+    OzoneManagerProtocolProtos.DeletedKeys.Builder bucketDeleteKeys = null;
+    Map<Long, BucketPurgeSize> bucketPurgeKeysSizeMap = new HashMap<>();
+
+    Map<String, List<PurgedKey>> modifiedKeyPurgedKeys = new HashMap<>();
+    while (purgeKeyIndex < purgedKeyList.size() || updateIndex < 
keysToUpdateList.size() ||
         (renameEntriesToBeDeleted != null && renameIndex < 
renameEntriesToBeDeleted.size())) {
 
       // 3.1 Purge keys (one at a time)
-      if (purgeKeyIndex < purgeKeys.size()) {
-        String nextKey = purgeKeys.get(purgeKeyIndex);
-        int estimatedKeySize = 
ProtobufUtils.computeRepeatedStringSize(nextKey);
-
-        requestBuilder.addDeletedKeys(
-            
OzoneManagerProtocolProtos.DeletedKeys.newBuilder().setVolumeName("").setBucketName("").addKeys(nextKey)
-                .build());
-        currSize += estimatedKeySize;
+      if (purgeKeyIndex < purgedKeyList.size()) {
+        PurgedKey purgedKey = purgedKeyList.get(purgeKeyIndex);
+        if (bucketDeleteKeys == null) {
+          bucketDeleteKeys = 
OzoneManagerProtocolProtos.DeletedKeys.newBuilder().setVolumeName("").setBucketName("");
+          currSize += bucketDeleteKeys.buildPartial().getSerializedSize();
+        }
+        String deletedKey = purgedKey.getDeleteKeyName();
+        // Add to purge keys only if there are no other version of key that 
needs to be retained.
+        if (completePurgedKeys.contains(deletedKey)) {
+          bucketDeleteKeys.addKeys(deletedKey);
+          int estimatedKeySize = 
ProtobufUtils.computeRepeatedStringSize(deletedKey);
+          currSize += estimatedKeySize;
+          if (purgedKey.isCommittedKey()) {
+            currSize += increaseBucketPurgeSize(bucketPurgeKeysSizeMap, 
purgedKey);
+          }
+        } else if (purgedKey.isCommittedKey()) {
+          modifiedKeyPurgedKeys.computeIfAbsent(deletedKey, k -> new 
ArrayList<>()).add(purgedKey);
+        }
         purgeKeyIndex++;
-
       } else if (updateIndex < keysToUpdateList.size()) {
         // 3.2 Add keysToUpdate
         OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = 
keysToUpdateList.get(updateIndex);
@@ -270,6 +354,13 @@ private Pair<Pair<Integer, Long>, Boolean> 
submitPurgeKeysRequest(
         int estimatedSize = nextUpdate.getSerializedSize();
 
         requestBuilder.addKeysToUpdate(nextUpdate);
+        if (modifiedKeyPurgedKeys.containsKey(nextUpdate.getKey())) {
+          for (PurgedKey purgedKey : 
modifiedKeyPurgedKeys.get(nextUpdate.getKey())) {
+            if (purgedKey.isCommittedKey()) {
+              currSize += increaseBucketPurgeSize(bucketPurgeKeysSizeMap, 
purgedKey);
+            }
+          }
+        }
         currSize += estimatedSize;
         updateIndex++;
 
@@ -285,10 +376,17 @@ private Pair<Pair<Integer, Long>, Boolean> 
submitPurgeKeysRequest(
       }
 
       // Flush either when limit is hit, or at the very end if items remain
-      boolean allDone = purgeKeyIndex == purgeKeys.size() && updateIndex == 
keysToUpdateList.size() &&
+      boolean allDone = purgeKeyIndex == purgedKeyList.size() && updateIndex 
== keysToUpdateList.size() &&
           (renameEntriesToBeDeleted == null || renameIndex == 
renameEntriesToBeDeleted.size());
 
-      if (currSize >= ratisLimit || (allDone && 
hasPendingItems(requestBuilder))) {
+      if (currSize >= ratisLimit || (allDone && 
(hasPendingItems(requestBuilder) || bucketDeleteKeys != null))) {
+        if (bucketDeleteKeys != null) {
+          requestBuilder.addDeletedKeys(bucketDeleteKeys.build());
+          bucketDeleteKeys = null;
+        }
+        
bucketPurgeKeysSizeMap.values().stream().map(BucketPurgeSize::toProtobuf)
+            .forEach(requestBuilder::addBucketPurgeKeysSize);
+        bucketPurgeKeysSizeMap.clear();
         purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, 
requestBuilder);
         requestBuilder = getPurgeKeysRequest(snapTableKey, 
expectedPreviousSnapshotId);
         currSize = baseSize;
@@ -474,23 +572,23 @@ private void processDeletedKeysForStore(SnapshotInfo 
currentSnapshotInfo, KeyMan
           PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null
               ? keyManager.getPendingDeletionKeys(reclaimableKeyFilter, 
remainNum)
               : keyManager.getPendingDeletionKeys(volume, bucket, null, 
reclaimableKeyFilter, remainNum);
-          List<BlockGroup> keyBlocksList = 
pendingKeysDeletion.getKeyBlocksList();
+          Map<String, PurgedKey> purgedKeys = 
pendingKeysDeletion.getPurgedKeys();
           //submit purge requests if there are renamed entries to be purged or 
keys to be purged.
-          if (!renamedTableEntries.isEmpty() || keyBlocksList != null && 
!keyBlocksList.isEmpty()) {
+          if (!renamedTableEntries.isEmpty() || purgedKeys != null && 
!purgedKeys.isEmpty()) {
             // Validating if the previous snapshot is still the same before 
purging the blocks.
             SnapshotUtils.validatePreviousSnapshotId(currentSnapshotInfo, 
snapshotChainManager,
                 expectedPreviousSnapshotId);
-            Pair<Pair<Integer, Long>, Boolean> purgeResult = 
processKeyDeletes(keyBlocksList,
+            Pair<Pair<Integer, Long>, Boolean> purgeResult = 
processKeyDeletes(purgedKeys,
                 pendingKeysDeletion.getKeysToModify(), renamedTableEntries, 
snapshotTableKey,
-                expectedPreviousSnapshotId, 
pendingKeysDeletion.getKeyBlockReplicatedSize());
+                expectedPreviousSnapshotId);
             remainNum -= purgeResult.getKey().getKey();
             successStatus = purgeResult.getValue();
-            getMetrics().incrNumKeysProcessed(keyBlocksList.size());
+            getMetrics().incrNumKeysProcessed(purgedKeys.size());
             
getMetrics().incrNumKeysSentForPurge(purgeResult.getKey().getKey());
 
             DeletionStats statsToUpdate = currentSnapshotInfo == null ? 
aosDeletionStats : snapshotDeletionStats;
             statsToUpdate.updateDeletionStats(purgeResult.getKey().getKey(), 
purgeResult.getKey().getValue(),
-                keyBlocksList.size() + 
pendingKeysDeletion.getNotReclaimableKeyCount(),
+                purgedKeys.size() + 
pendingKeysDeletion.getNotReclaimableKeyCount(),
                 pendingKeysDeletion.getNotReclaimableKeyCount()
             );
             if (successStatus) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
index 43195d1abf1..ff3fc5f31b9 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
@@ -68,6 +68,7 @@
 import 
org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
 import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.util.Time;
@@ -148,13 +149,16 @@ private OMRequest createPurgeKeysRequest(String 
fromSnapshot, String purgeDelete
   }
 
   private OMRequest createPurgeKeysRequest(String fromSnapshot,
-      List<PurgePathRequest> purgePathRequestList) {
+      List<PurgePathRequest> purgePathRequestList, List<BucketNameInfo> 
bucketInfoList) {
     OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest 
=
         OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
     purgeDirRequest.addAllDeletedPath(purgePathRequestList);
     if (fromSnapshot != null) {
       purgeDirRequest.setSnapshotTableKey(fromSnapshot);
     }
+    if (bucketInfoList != null) {
+      purgeDirRequest.addAllBucketNameInfos(bucketInfoList);
+    }
     OzoneManagerProtocolProtos.OMRequest omRequest =
         OzoneManagerProtocolProtos.OMRequest.newBuilder()
             .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
@@ -180,7 +184,9 @@ private OMRequest createPurgeKeysRequest(String 
fromSnapshot, String purgeDelete
     PurgePathRequest request = wrapPurgeRequest(
         volumeId, bucketId, purgeDeletedDir, subFiles, subDirs);
     purgePathRequestList.add(request);
-    return createPurgeKeysRequest(fromSnapshot, purgePathRequestList);
+    return createPurgeKeysRequest(fromSnapshot, purgePathRequestList, 
Collections.singletonList(
+        
BucketNameInfo.newBuilder().setVolumeName(volumeName).setBucketName(bucketName)
+            .setBucketId(bucketId).setVolumeId(volumeId).buildPartial()));
   }
 
   private PurgePathRequest wrapPurgeRequest(
@@ -294,8 +300,9 @@ public void testBucketLockWithPurgeDirectory() throws 
Exception {
     // Add volume, bucket and key entries to OM DB.
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket2,
         omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
-    String bucketKey2 = omMetadataManager.getBucketKey(volumeName, bucket1);
+    String bucketKey2 = omMetadataManager.getBucketKey(volumeName, bucket2);
     OmBucketInfo bucketInfo2 = 
omMetadataManager.getBucketTable().get(bucketKey2);
+    long volumeId = omMetadataManager.getVolumeId(volumeName);
     PurgePathRequest purgePathRequest2 = 
createBucketDataAndGetPurgePathRequest(bucketInfo2);
     IOzoneManagerLock lock = spy(omMetadataManager.getLock());
     Set<Long> acquiredLockIds = new ConcurrentSkipListSet<>();
@@ -321,10 +328,19 @@ public void testBucketLockWithPurgeDirectory() throws 
Exception {
         return lockDetails;
       }).when(lock).acquireWriteLocks(eq(BUCKET_LOCK), anyCollection());
       when(omMetadataManager.getLock()).thenReturn(lock);
+      List<BucketNameInfo> bucketInfoList = Arrays.asList(
+          
BucketNameInfo.newBuilder().setVolumeName(bucketInfo1.getVolumeName())
+              .setBucketName(bucketInfo1.getBucketName())
+              
.setBucketId(bucketInfo1.getObjectID()).setVolumeId(volumeId).build(),
+          
BucketNameInfo.newBuilder().setVolumeName(bucketInfo2.getVolumeName())
+              .setBucketName(bucketInfo2.getBucketName())
+              
.setBucketId(bucketInfo2.getObjectID()).setVolumeId(volumeId).build());
       OMDirectoriesPurgeRequestWithFSO purgePathRequests1 = new 
OMDirectoriesPurgeRequestWithFSO(
-          preExecute(createPurgeKeysRequest(null, 
Arrays.asList(purgePathRequest1, purgePathRequest2))));
+          preExecute(createPurgeKeysRequest(null, 
Arrays.asList(purgePathRequest1, purgePathRequest2),
+              bucketInfoList)));
       OMDirectoriesPurgeRequestWithFSO purgePathRequests2 = new 
OMDirectoriesPurgeRequestWithFSO(
-          preExecute(createPurgeKeysRequest(null, 
Arrays.asList(purgePathRequest2, purgePathRequest1))));
+          preExecute(createPurgeKeysRequest(null, 
Arrays.asList(purgePathRequest2, purgePathRequest1),
+              bucketInfoList)));
       CompletableFuture future1 = CompletableFuture.runAsync(() ->
           purgePathRequests1.validateAndUpdateCache(ozoneManager, 100L));
       CompletableFuture future2 = CompletableFuture.runAsync(() ->
@@ -459,6 +475,7 @@ public void testValidateAndUpdateCacheCheckQuota() throws 
Exception {
     omBucketInfo = omMetadataManager.getBucketTable().get(
         bucketKey);
     assertEquals(0L * deletedKeyNames.size(), omBucketInfo.getUsedBytes());
+    assertEquals(1000L * deletedKeyNames.size(), 
omBucketInfo.getSnapshotUsedBytes());
 
     performBatchOperationCommit(omClientResponse);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index 291c5cd11f2..aa566859cb4 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -163,7 +163,7 @@ public void testValidateAndUpdateCache() throws Exception {
 
       OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
           omResponse, deleteKeysAndRenamedEntry.getKey(), 
deleteKeysAndRenamedEntry.getValue(), null,
-          null);
+          null, null);
       omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
 
       // Do manual commit and see whether addToBatch is successful or not.
@@ -239,7 +239,7 @@ public void testKeyPurgeInSnapshot() throws Exception {
         omMetadataManager.getStore().initBatchOperation()) {
 
       OMKeyPurgeResponse omKeyPurgeResponse = new 
OMKeyPurgeResponse(omResponse, deleteKeysAndRenamedEntry.getKey(),
-          deleteKeysAndRenamedEntry.getValue(), snapInfo, null);
+          deleteKeysAndRenamedEntry.getValue(), snapInfo, null, null);
       omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
 
       // Do manual commit and see whether addToBatch is successful or not.
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index c77cf9b0177..9246df12c9c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -50,6 +50,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +86,7 @@
 import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.PendingKeysDeletion;
+import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey;
 import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -96,6 +98,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
@@ -140,8 +143,6 @@ class TestKeyDeletingService extends OzoneTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestKeyDeletingService.class);
   private static final AtomicInteger OBJECT_COUNTER = new AtomicInteger();
-  private static final long DATA_SIZE = 1000L;
-
   private OzoneConfiguration conf;
   private OzoneManagerProtocol writeClient;
   private OzoneManager om;
@@ -240,7 +241,7 @@ void checkIfDeleteServiceIsDeletingKeys()
       assertThat(getRunCount()).isGreaterThan(initialRunCount);
       assertThat(keyManager.getPendingDeletionKeys(new 
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
               
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), 
null,
-              keyManager, om.getMetadataManager().getLock()), 
Integer.MAX_VALUE).getKeyBlocksList())
+              keyManager, om.getMetadataManager().getLock()), 
Integer.MAX_VALUE).getPurgedKeys())
           .isEmpty();
     }
 
@@ -269,7 +270,7 @@ void checkDeletionForKeysWithMultipleVersions() throws 
Exception {
           1000, 10000);
       assertThat(getRunCount())
           .isGreaterThan(initialRunCount);
-      assertThat(keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE).getKeyBlocksList())
+      assertThat(keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE).getPurgedKeys())
           .isEmpty();
 
       // The 1st version of the key has 1 block and the 2nd version has 2
@@ -300,21 +301,34 @@ void checkDeletedTableCleanUpForSnapshot() throws 
Exception {
       // Create snapshot
       String snapName = uniqueObjectName("snap");
       writeClient.createSnapshot(volumeName, bucketName1, snapName);
-
+      keyDeletingService.suspend();
       // Delete the key
       writeClient.deleteKey(key1);
       writeClient.deleteKey(key2);
-
+      // Create a key3 in bucket1 which should be reclaimable to check quota 
usage.
+      OmKeyArgs key3 = createAndCommitKey(volumeName, bucketName1, 
uniqueObjectName(keyName), 3);
+      OmBucketInfo bucketInfo = writeClient.getBucketInfo(volumeName, 
bucketName1);
+      long key1Size = QuotaUtil.getReplicatedSize(key1.getDataSize(), 
key1.getReplicationConfig());
+      long key3Size = QuotaUtil.getReplicatedSize(key3.getDataSize(), 
key3.getReplicationConfig());
+
+      assertEquals(key1Size, bucketInfo.getSnapshotUsedBytes());
+      assertEquals(1, bucketInfo.getSnapshotUsedNamespace());
+      writeClient.deleteKey(key3);
+      bucketInfo = writeClient.getBucketInfo(volumeName, bucketName1);
+      assertEquals(key1Size + key3Size, bucketInfo.getSnapshotUsedBytes());
+      assertEquals(2, bucketInfo.getSnapshotUsedNamespace());
+      writeClient.getBucketInfo(volumeName, bucketName1);
+      keyDeletingService.resume();
       // Run KeyDeletingService
       GenericTestUtils.waitFor(
-          () -> getDeletedKeyCount() >= initialDeletedCount + 1,
-          1000, 10000);
+          () -> getDeletedKeyCount() >= initialDeletedCount + 2,
+          1000, 100000);
       assertThat(getRunCount())
           .isGreaterThan(initialRunCount);
       assertThat(keyManager.getPendingDeletionKeys(new 
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
               
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), 
null,
               keyManager, om.getMetadataManager().getLock()),
-          Integer.MAX_VALUE).getKeyBlocksList())
+          Integer.MAX_VALUE).getPurgedKeys())
           .isEmpty();
 
       // deletedTable should have deleted key of the snapshot bucket
@@ -323,6 +337,9 @@ void checkDeletedTableCleanUpForSnapshot() throws Exception 
{
           metadataManager.getOzoneKey(volumeName, bucketName1, keyName);
       String ozoneKey2 =
           metadataManager.getOzoneKey(volumeName, bucketName2, keyName);
+      String ozoneKey3 =
+          metadataManager.getOzoneKey(volumeName, bucketName2, 
key3.getKeyName());
+
 
       // key1 belongs to snapshot, so it should not be deleted when
       // KeyDeletingService runs. But key2 can be reclaimed as it doesn't
@@ -335,6 +352,13 @@ void checkDeletedTableCleanUpForSnapshot() throws 
Exception {
           = metadataManager.getDeletedTable().getRangeKVs(
           null, 100, ozoneKey2);
       assertEquals(0, rangeKVs.size());
+      rangeKVs
+          = metadataManager.getDeletedTable().getRangeKVs(
+          null, 100, ozoneKey3);
+      assertEquals(0, rangeKVs.size());
+      bucketInfo = writeClient.getBucketInfo(volumeName, bucketName1);
+      assertEquals(key1Size, bucketInfo.getSnapshotUsedBytes());
+      assertEquals(1, bucketInfo.getSnapshotUsedNamespace());
     }
 
     /*
@@ -418,8 +442,8 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
       assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, 
metadataManager);
       doAnswer(i -> {
         PendingKeysDeletion pendingKeysDeletion = (PendingKeysDeletion) 
i.callRealMethod();
-        for (BlockGroup group : pendingKeysDeletion.getKeyBlocksList()) {
-          Assertions.assertNotEquals(deletePathKey[0], group.getGroupID());
+        for (PurgedKey purgedKey : 
pendingKeysDeletion.getPurgedKeys().values()) {
+          Assertions.assertNotEquals(deletePathKey[0], 
purgedKey.getBlockGroup().getGroupID());
         }
         return pendingKeysDeletion;
       }).when(km).getPendingDeletionKeys(any(), anyInt());
@@ -663,13 +687,13 @@ void testSnapshotExclusiveSize() throws Exception {
       final String testVolumeName = getTestName();
       final String testBucketName = uniqueObjectName("bucket");
       final String keyName = uniqueObjectName("key");
-
+      Map<Integer, Long> keySizeMap = new HashMap<>();
       // Create Volume and Buckets
       createVolumeAndBucket(testVolumeName, testBucketName, false);
 
       // Create 3 keys
       for (int i = 1; i <= 3; i++) {
-        createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
+        keySizeMap.put(i, createAndCommitKey(testVolumeName, testBucketName, 
keyName + i, 3).getDataSize());
       }
       assertTableRowCount(keyTable, initialKeyCount + 3, metadataManager);
 
@@ -681,7 +705,7 @@ void testSnapshotExclusiveSize() throws Exception {
 
       // Create 2 keys
       for (int i = 4; i <= 5; i++) {
-        createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
+        keySizeMap.put(i, createAndCommitKey(testVolumeName, testBucketName, 
keyName + i, 3).getDataSize());
       }
       // Delete a key, rename 2 keys. We will be using this to test
       // how we handle renamed key for exclusive size calculation.
@@ -699,7 +723,7 @@ void testSnapshotExclusiveSize() throws Exception {
 
       // Create 2 keys
       for (int i = 6; i <= 7; i++) {
-        createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
+        keySizeMap.put(i, createAndCommitKey(testVolumeName, testBucketName, 
keyName + i, 3).getDataSize());
       }
 
       deleteKey(testVolumeName, testBucketName, "renamedKey1");
@@ -734,14 +758,12 @@ void testSnapshotExclusiveSize() throws Exception {
       keyDeletingService.resume();
 
       Map<String, Long> expectedSize = new ImmutableMap.Builder<String, Long>()
-          .put(snap1, 1000L)
-          .put(snap2, 1000L)
-          .put(snap3, 2000L)
+          .put(snap1, keySizeMap.get(3))
+          .put(snap2, keySizeMap.get(4))
+          .put(snap3, keySizeMap.get(6) + keySizeMap.get(7))
           .put(snap4, 0L)
           .build();
-      System.out.println(expectedSize);
-
-      // Let KeyDeletingService to run for some iterations
+      // Let KeyDeletingService run for some iterations
       GenericTestUtils.waitFor(
           () -> (getRunCount() > prevKdsRunCount + 20),
           100, 100000);
@@ -756,7 +778,6 @@ void testSnapshotExclusiveSize() throws Exception {
 
           Long expected = expectedSize.getOrDefault(snapshotName, 
snapshotInfo.getExclusiveSize());
           assertNotNull(expected);
-          System.out.println(snapshotName);
           assertEquals(expected, snapshotInfo.getExclusiveSize());
           // Since for the test we are using RATIS/THREE
           assertEquals(expected * 3, 
snapshotInfo.getExclusiveReplicatedSize());
@@ -805,8 +826,10 @@ public void testFailingModifiedKeyPurge() throws 
IOException, InterruptedExcepti
               return 
OzoneManagerProtocolProtos.OMResponse.newBuilder().setCmdType(purgeRequest.get().getCmdType())
                   
.setStatus(OzoneManagerProtocolProtos.Status.TIMEOUT).build();
             });
-        List<BlockGroup> blockGroups = 
Collections.singletonList(BlockGroup.newBuilder().setKeyName("key1")
-            .addAllBlockIDs(Collections.singletonList(new BlockID(1, 
1))).build());
+        BlockGroup blockGroup = BlockGroup.newBuilder().setKeyName("key1/1")
+            .addAllBlockIDs(Collections.singletonList(new BlockID(1, 
1))).build();
+        Map<String, PurgedKey> blockGroups = 
Collections.singletonMap(blockGroup.getGroupID(), new PurgedKey("vol",
+            "buck", 1, blockGroup, "key1", 30, true));
         List<String> renameEntriesToBeDeleted = 
Collections.singletonList("key2");
         OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
             .setBucketName("buck")
@@ -820,7 +843,7 @@ public void testFailingModifiedKeyPurge() throws 
IOException, InterruptedExcepti
             .build();
         Map<String, RepeatedOmKeyInfo> keysToModify = 
Collections.singletonMap("key1",
             new RepeatedOmKeyInfo(Collections.singletonList(omKeyInfo), 0L));
-        keyDeletingService.processKeyDeletes(blockGroups, keysToModify, 
renameEntriesToBeDeleted, null, null, null);
+        keyDeletingService.processKeyDeletes(blockGroups, keysToModify, 
renameEntriesToBeDeleted, null, null);
         
assertTrue(purgeRequest.get().getPurgeKeysRequest().getKeysToUpdateList().isEmpty());
         assertEquals(renameEntriesToBeDeleted, 
purgeRequest.get().getPurgeKeysRequest().getRenamedKeysList());
       }
@@ -967,9 +990,11 @@ void testLastRunAnd24hMetrics() throws Exception {
       writeClient.createSnapshot(volumeName, bucketName, snap2);
 
       // Create and delete 5 more keys.
+      long dataSize = 0L;
       for (int i = 16; i <= 20; i++) {
         OmKeyArgs args = createAndCommitKey(volumeName, bucketName, 
uniqueObjectName("key"), 1);
         createdKeys.add(args);
+        dataSize = args.getDataSize();
       }
       for (int i = 15; i < 20; i++) {
         writeClient.deleteKey(createdKeys.get(i));
@@ -997,17 +1022,17 @@ void testLastRunAnd24hMetrics() throws Exception {
       GenericTestUtils.waitFor(() -> getDeletedKeyCount() == 10, 100, 10000);
       // Verify last run AOS deletion metrics.
       assertEquals(5, metrics.getAosKeysReclaimedLast());
-      assertEquals(5 * DATA_SIZE * 3, metrics.getAosReclaimedSizeLast());
+      assertEquals(5 * dataSize * 3, metrics.getAosReclaimedSizeLast());
       assertEquals(5, metrics.getAosKeysIteratedLast());
       assertEquals(0, metrics.getAosKeysNotReclaimableLast());
       // Verify last run Snapshot deletion metrics.
       assertEquals(5, metrics.getSnapKeysReclaimedLast());
-      assertEquals(5 * DATA_SIZE * 3, metrics.getSnapReclaimedSizeLast());
+      assertEquals(5 * dataSize * 3, metrics.getSnapReclaimedSizeLast());
       assertEquals(15, metrics.getSnapKeysIteratedLast());
       assertEquals(10, metrics.getSnapKeysNotReclaimableLast());
       // Verify 24h deletion metrics.
       assertEquals(10, metrics.getKeysReclaimedInInterval());
-      assertEquals(10 * DATA_SIZE * 3, metrics.getReclaimedSizeInInterval());
+      assertEquals(10 * dataSize * 3, metrics.getReclaimedSizeInInterval());
 
       // Delete snap1. Which also sets the snap2 to be deep cleaned.
       writeClient.deleteSnapshot(volumeName, bucketName, snap1);
@@ -1035,12 +1060,12 @@ void testLastRunAnd24hMetrics() throws Exception {
       assertEquals(0, metrics.getAosKeysNotReclaimableLast());
       // Verify last run Snapshot deletion metrics.
       assertEquals(10, metrics.getSnapKeysReclaimedLast());
-      assertEquals(10 * DATA_SIZE * 3, metrics.getSnapReclaimedSizeLast());
+      assertEquals(10 * dataSize * 3, metrics.getSnapReclaimedSizeLast());
       assertEquals(10, metrics.getSnapKeysIteratedLast());
       assertEquals(0, metrics.getSnapKeysNotReclaimableLast());
       // Verify 24h deletion metrics.
       assertEquals(20, metrics.getKeysReclaimedInInterval());
-      assertEquals(20 * DATA_SIZE * 3, metrics.getReclaimedSizeInInterval());
+      assertEquals(20 * dataSize * 3, metrics.getReclaimedSizeInInterval());
     }
   }
 
@@ -1245,6 +1270,7 @@ private void createVolumeAndBucket(String volumeName,
     OMRequestTestUtils.addBucketToOM(keyManager.getMetadataManager(),
         OmBucketInfo.newBuilder().setVolumeName(volumeName)
             .setBucketName(bucketName)
+            .setObjectID(OBJECT_COUNTER.incrementAndGet())
             .setIsVersionEnabled(isVersioningEnabled)
             .build());
   }
@@ -1314,10 +1340,11 @@ private OmKeyArgs createAndCommitKey(String volumeName,
 
     List<OmKeyLocationInfo> latestBlocks = keyLocationVersions
         .getBlocksLatestVersionOnly();
-
+    long size = 0;
     int preAllocatedSize = latestBlocks.size();
     for (OmKeyLocationInfo block : latestBlocks) {
       keyArg.addLocationInfo(block);
+      size += block.getLength();
     }
 
     LinkedList<OmKeyLocationInfo> allocated = new LinkedList<>();
@@ -1331,8 +1358,9 @@ private OmKeyArgs createAndCommitKey(String volumeName,
 
     for (OmKeyLocationInfo block : allocated) {
       keyArg.addLocationInfo(block);
+      size += block.getLength();
     }
-
+    keyArg.setDataSize(size);
     customWriteClient.commitKey(keyArg, session.getId());
     return keyArg;
   }
@@ -1352,7 +1380,7 @@ private long getRunCount() {
   private int countKeysPendingDeletion() {
     try {
       final int count = keyManager.getPendingDeletionKeys((kv) -> true, 
Integer.MAX_VALUE)
-          .getKeyBlocksList().size();
+          .getPurgedKeys().size();
       LOG.debug("KeyManager keys pending deletion: {}", count);
       return count;
     } catch (IOException e) {
@@ -1363,8 +1391,9 @@ private int countKeysPendingDeletion() {
   private long countBlocksPendingDeletion() {
     try {
       return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE)
-          .getKeyBlocksList()
+          .getPurgedKeys().values()
           .stream()
+          .map(PurgedKey::getBlockGroup)
           .map(BlockGroup::getBlockIDList)
           .mapToLong(Collection::size)
           .sum();
@@ -1374,6 +1403,6 @@ private long countBlocksPendingDeletion() {
   }
 
   private static String uniqueObjectName(String prefix) {
-    return prefix + OBJECT_COUNTER.getAndIncrement();
+    return prefix + String.format("%010d", OBJECT_COUNTER.getAndIncrement());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to