smengcl commented on code in PR #4486:
URL: https://github.com/apache/ozone/pull/4486#discussion_r1151212322


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {

Review Comment:
   Can use `protected` here
   
   ```suggestion
     protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
                                     KeyManager manager,
                                     long startTime,
                                     String snapTableKey)
         throws IOException {
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);

Review Comment:
   ```suggestion
           LOG.debug("Blocks for {} (out of {}) keys are deleted in {} ms",
               delCount, results.size(), Time.monotonicNow() - startTime);
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+
+    OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
+        OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();
+
+    // Add keys to PurgeKeysRequest bucket wise.
+    for (Map.Entry<Pair<String, String>, List<String>> entry :
+        purgeKeysMapPerBucket.entrySet()) {
+      Pair<String, String> volumeBucketPair = entry.getKey();
+      OzoneManagerProtocolProtos.DeletedKeys deletedKeysInBucket =
+          OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
+          .setVolumeName(volumeBucketPair.getLeft())
+          .setBucketName(volumeBucketPair.getRight())
+          .addAllKeys(entry.getValue())
+          .build();
+      purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
+    }
+
+    if (snapTableKey != null) {
+      purgeKeysRequest.setFromSnapshot(snapTableKey);
+    }
+
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
+        .setPurgeKeysRequest(purgeKeysRequest)
+        .setClientId(clientId.toString())
+        .build();

Review Comment:
   ```suggestion
       OMRequest omRequest =
           OMRequest.newBuilder()
           .setCmdType(Type.PurgeKeys)
           .setPurgeKeysRequest(purgeKeysRequest)
           .setClientId(clientId.toString())
           .build();
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+
+    OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
+        OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();
+
+    // Add keys to PurgeKeysRequest bucket wise.
+    for (Map.Entry<Pair<String, String>, List<String>> entry :
+        purgeKeysMapPerBucket.entrySet()) {
+      Pair<String, String> volumeBucketPair = entry.getKey();
+      OzoneManagerProtocolProtos.DeletedKeys deletedKeysInBucket =
+          OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
+          .setVolumeName(volumeBucketPair.getLeft())
+          .setBucketName(volumeBucketPair.getRight())
+          .addAllKeys(entry.getValue())
+          .build();
+      purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
+    }
+
+    if (snapTableKey != null) {
+      purgeKeysRequest.setFromSnapshot(snapTableKey);
+    }
+
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
+        .setPurgeKeysRequest(purgeKeysRequest)
+        .setClientId(clientId.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest =
+          createRaftClientRequestForPurge(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("PurgeKey request failed. Will retry at next run.");
+      return 0;
+    }
+
+    return deletedCount;
+  }
+
+  private RaftClientRequest createRaftClientRequestForPurge(
+      OzoneManagerProtocolProtos.OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(clientId)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  /**
+   * Parse Volume and Bucket Name from ObjectKey and add it to given map of
+   * keys to be purged per bucket.
+   */
+  private void addToMap(Map<Pair<String, String>, List<String>> map,
+                        String objectKey) {
+    // Parse volume and bucket name
+    String[] split = objectKey.split(OM_KEY_PREFIX);
+    Preconditions.assertTrue(split.length > 3, "Volume and/or Bucket Name " +
+        "missing from Key Name.");
+    Pair<String, String> volumeBucketPair = Pair.of(split[1], split[2]);
+    if (!map.containsKey(volumeBucketPair)) {
+      map.put(volumeBucketPair, new ArrayList<>());
+    }
+    map.get(volumeBucketPair).add(objectKey);
+  }
+
+  public boolean isRatisEnabled() {
+    if (ozoneManager == null) {
+      return false;
+    }
+    return ozoneManager.isRatisEnabled();
+  }
+
+

Review Comment:
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -234,13 +238,26 @@ public BackgroundTaskResult call() throws Exception {
               if (!(toReclaim.getKeyInfosCount() ==
                   repeatedOmKeyInfo.getOmKeyInfoList().size())) {
                 toReclaimList.add(toReclaim.build());
+                toNextDBList.add(toNextDb.build());
+              } else {
+                // The key can be reclaimed here.
+                List<BlockGroup> blocksForKeyDelete = omSnapshot
+                    .getMetadataManager()
+                    .getBlocksForKeyDelete(deletedKey);
+                if (blocksForKeyDelete != null) {
+                  keysToPurge.addAll(blocksForKeyDelete);
+                }
               }
-              toNextDBList.add(toNextDb.build());
               deletionCount++;
             }
             // Submit Move request to OM.
             submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList,
                 toNextDBList);
+
+            // Delete keys From deletedTable
+            long startTime = Time.monotonicNow();
+            processKeyDeletes(keysToPurge, omSnapshot.getKeyManager(),
+                startTime, snapInfo.getTableKey());

Review Comment:
   Feels like we can put the `startTime` inside `processKeyDeletes()` and 
remove it from the params list.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);

Review Comment:
   ```suggestion
       List<DeleteBlockGroupResult> blockDeletionResults =
           scmClient.deleteKeyBlocks(keyBlocksList);
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);

Review Comment:
   I'm not even sure OM can still boot without Ratis anymore. So this code path 
might never be used.
   
   Need to double check.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {

Review Comment:
   Can be `private`
   
   ```suggestion
     private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
                                        String snapTableKey) {
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+

Review Comment:
   nit: extra line
   ```suggestion
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+

Review Comment:
   nit
   ```suggestion
   
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+
+    OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
+        OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();

Review Comment:
   ```suggestion
       PurgeKeysRequest.Builder purgeKeysRequest = 
PurgeKeysRequest.newBuilder();
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+
+    OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
+        OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();
+
+    // Add keys to PurgeKeysRequest bucket wise.
+    for (Map.Entry<Pair<String, String>, List<String>> entry :
+        purgeKeysMapPerBucket.entrySet()) {
+      Pair<String, String> volumeBucketPair = entry.getKey();
+      OzoneManagerProtocolProtos.DeletedKeys deletedKeysInBucket =
+          OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
+          .setVolumeName(volumeBucketPair.getLeft())
+          .setBucketName(volumeBucketPair.getRight())
+          .addAllKeys(entry.getValue())
+          .build();

Review Comment:
   ```suggestion
         DeletedKeys deletedKeysInBucket =
             DeletedKeys.newBuilder()
             .setVolumeName(volumeBucketPair.getLeft())
             .setBucketName(volumeBucketPair.getRight())
             .addAllKeys(entry.getValue())
             .build();
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+
+    OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
+        OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();
+
+    // Add keys to PurgeKeysRequest bucket wise.
+    for (Map.Entry<Pair<String, String>, List<String>> entry :
+        purgeKeysMapPerBucket.entrySet()) {
+      Pair<String, String> volumeBucketPair = entry.getKey();
+      OzoneManagerProtocolProtos.DeletedKeys deletedKeysInBucket =
+          OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
+          .setVolumeName(volumeBucketPair.getLeft())
+          .setBucketName(volumeBucketPair.getRight())
+          .addAllKeys(entry.getValue())
+          .build();
+      purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
+    }
+
+    if (snapTableKey != null) {
+      purgeKeysRequest.setFromSnapshot(snapTableKey);
+    }
+
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
+        .setPurgeKeysRequest(purgeKeysRequest)
+        .setClientId(clientId.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest =
+          createRaftClientRequestForPurge(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("PurgeKey request failed. Will retry at next run.");
+      return 0;
+    }
+
+    return deletedCount;
+  }
+
+  private RaftClientRequest createRaftClientRequestForPurge(
+      OzoneManagerProtocolProtos.OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(clientId)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  /**
+   * Parse Volume and Bucket Name from ObjectKey and add it to given map of
+   * keys to be purged per bucket.
+   */
+  private void addToMap(Map<Pair<String, String>, List<String>> map,
+                        String objectKey) {
+    // Parse volume and bucket name
+    String[] split = objectKey.split(OM_KEY_PREFIX);
+    Preconditions.assertTrue(split.length > 3, "Volume and/or Bucket Name " +
+        "missing from Key Name.");
+    Pair<String, String> volumeBucketPair = Pair.of(split[1], split[2]);
+    if (!map.containsKey(volumeBucketPair)) {
+      map.put(volumeBucketPair, new ArrayList<>());
+    }
+    map.get(volumeBucketPair).add(objectKey);
+  }
+
+  public boolean isRatisEnabled() {
+    if (ozoneManager == null) {
+      return false;
+    }
+    return ozoneManager.isRatisEnabled();
+  }
+
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    return null;
+  }
+

Review Comment:
   I don't think this abstract class has to implement `getTasks()`
   
   ```suggestion
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+
+    OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
+        OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();
+
+    // Add keys to PurgeKeysRequest bucket wise.
+    for (Map.Entry<Pair<String, String>, List<String>> entry :
+        purgeKeysMapPerBucket.entrySet()) {
+      Pair<String, String> volumeBucketPair = entry.getKey();
+      OzoneManagerProtocolProtos.DeletedKeys deletedKeysInBucket =
+          OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
+          .setVolumeName(volumeBucketPair.getLeft())
+          .setBucketName(volumeBucketPair.getRight())
+          .addAllKeys(entry.getValue())
+          .build();
+      purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
+    }
+
+    if (snapTableKey != null) {
+      purgeKeysRequest.setFromSnapshot(snapTableKey);
+    }

Review Comment:
   Let's move this `set` **immediately** after `purgeKeysRequest` 
initialization above.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java:
##########
@@ -76,37 +56,21 @@ public class KeyDeletingService extends BackgroundService {
   // times.
   private static final int KEY_DELETING_CORE_POOL_SIZE = 1;
 
-  private final OzoneManager ozoneManager;
-  private final ScmBlockLocationProtocol scmClient;
   private final KeyManager manager;
   private static ClientId clientId = ClientId.randomId();
   private final int keyLimitPerTask;
   private final AtomicLong deletedKeyCount;
-  private final AtomicLong runCount;
 
   public KeyDeletingService(OzoneManager ozoneManager,
       ScmBlockLocationProtocol scmClient,
       KeyManager manager, long serviceInterval,
       long serviceTimeout, ConfigurationSource conf) {
     super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,

Review Comment:
   ```suggestion
       super(KeyDeletingService.class.getSimpleName(),
           serviceInterval, TimeUnit.MILLISECONDS,
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -234,13 +238,26 @@ public BackgroundTaskResult call() throws Exception {
               if (!(toReclaim.getKeyInfosCount() ==
                   repeatedOmKeyInfo.getOmKeyInfoList().size())) {
                 toReclaimList.add(toReclaim.build());

Review Comment:
   Can the keys here in `toReclaimList` be reclaimed in-place as well, similar 
to the logic below?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -195,12 +198,13 @@ public BackgroundTaskResult call() throws Exception {
               RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable
               .iterator()) {
 
+            List<BlockGroup> keysToPurge = new ArrayList<>();
             String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
             iterator.seek(snapshotBucketKey);
 
             int deletionCount = 0;
             while (deletedIterator.hasNext() &&
-                deletionCount <= keyLimitPerSnapshot) {
+                deletionCount < keyLimitPerSnapshot) {

Review Comment:
   good catch :D



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java:
##########
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Abstract's KeyDeletingService.
+ */
+public abstract class AbstractKeyDeletingService extends BackgroundService {
+
+  private final OzoneManager ozoneManager;
+  private final ScmBlockLocationProtocol scmClient;
+  private static ClientId clientId = ClientId.randomId();
+  private final AtomicLong runCount;
+
+  public AbstractKeyDeletingService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout,
+      OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) {
+    super(serviceName, interval, unit, threadPoolSize, serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.scmClient = scmClient;
+    this.runCount = new AtomicLong(0);
+
+  }
+
+  public int processKeyDeletes(List<BlockGroup> keyBlocksList,
+      KeyManager manager, long startTime, String snapTableKey)
+      throws IOException {
+    int delCount = 0;
+    List<DeleteBlockGroupResult> results =
+        scmClient.deleteKeyBlocks(keyBlocksList);
+    if (results != null) {
+      if (isRatisEnabled()) {
+        delCount = submitPurgeKeysRequest(results, snapTableKey);
+      } else {
+        // TODO: Once HA and non-HA paths are merged, we should have
+        //  only one code path here. Purge keys should go through an
+        //  OMRequest model.
+        delCount = deleteAllKeys(results, manager);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+            delCount, Time.monotonicNow() - startTime);
+      }
+    }
+    return delCount;
+  }
+
+  /**
+   * Deletes all the keys that SCM has acknowledged and queued for delete.
+   *
+   * @param results DeleteBlockGroups returned by SCM.
+   * @throws IOException      on Error
+   */
+  private int deleteAllKeys(List<DeleteBlockGroupResult> results,
+      KeyManager manager) throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable =
+        manager.getMetadataManager().getDeletedTable();
+    DBStore store = manager.getMetadataManager().getStore();
+
+    // Put all keys to delete in a single transaction and call for delete.
+    int deletedCount = 0;
+    try (BatchOperation writeBatch = store.initBatchOperation()) {
+      for (DeleteBlockGroupResult result : results) {
+        if (result.isSuccess()) {
+          // Purge key from OM DB.
+          deletedTable.deleteWithBatch(writeBatch,
+              result.getObjectKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+          }
+          deletedCount++;
+        }
+      }
+      // Write a single transaction for delete.
+      store.commitBatchOperation(writeBatch);
+    }
+    return deletedCount;
+  }
+
+  /**
+   * Submits PurgeKeys request for the keys whose blocks have been deleted
+   * by SCM.
+   * @param results DeleteBlockGroups returned by SCM.
+   */
+  public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+      String snapTableKey) {
+    Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
+        new HashMap<>();
+
+    // Put all keys to be purged in a list
+    int deletedCount = 0;
+    for (DeleteBlockGroupResult result : results) {
+      if (result.isSuccess()) {
+        // Add key to PurgeKeys list.
+        String deletedKey = result.getObjectKey();
+        // Parse Volume and BucketName
+        addToMap(purgeKeysMapPerBucket, deletedKey);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+        }
+        deletedCount++;
+      }
+    }
+
+
+    OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
+        OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();
+
+    // Add keys to PurgeKeysRequest bucket wise.
+    for (Map.Entry<Pair<String, String>, List<String>> entry :
+        purgeKeysMapPerBucket.entrySet()) {
+      Pair<String, String> volumeBucketPair = entry.getKey();
+      OzoneManagerProtocolProtos.DeletedKeys deletedKeysInBucket =
+          OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
+          .setVolumeName(volumeBucketPair.getLeft())
+          .setBucketName(volumeBucketPair.getRight())
+          .addAllKeys(entry.getValue())
+          .build();
+      purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
+    }
+
+    if (snapTableKey != null) {
+      purgeKeysRequest.setFromSnapshot(snapTableKey);
+    }
+
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
+        .setPurgeKeysRequest(purgeKeysRequest)
+        .setClientId(clientId.toString())
+        .build();

Review Comment:
   And import these at the beginning just like existing KDS does:
   
   ```
   import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
   import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
   import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
   import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
   ```
   
   It makes the code more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to