This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7196dbbd22 HDDS-8995. Refactor the datanode BlockDeletingService 
(#5047)
7196dbbd22 is described below

commit 7196dbbd22e0f4ea2018e2451289798a42ddb79b
Author: hao guo <[email protected]>
AuthorDate: Fri Aug 25 14:19:54 2023 +0800

    HDDS-8995. Refactor the datanode BlockDeletingService (#5047)
---
 .../helpers/BlockDeletingServiceMetrics.java       |   2 +-
 .../container/common/helpers/ContainerUtils.java   |  15 +
 .../common/impl/BlockDeletingService.java          | 290 +++++++++
 .../RandomContainerDeletionChoosingPolicy.java     |   3 +-
 ...TopNOrderedContainerDeletionChoosingPolicy.java |  15 +-
 .../ContainerDeletionChoosingPolicy.java           |   2 +-
 .../ContainerDeletionChoosingPolicyTemplate.java   |  22 +-
 .../background/BlockDeletingService.java           | 672 ---------------------
 .../statemachine/background/BlockDeletingTask.java | 484 +++++++++++++++
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   2 +-
 .../container/common/TestBlockDeletingService.java |   2 +-
 .../impl/TestContainerDeletionChoosingPolicy.java  |   7 +-
 .../testutils/BlockDeletingServiceTestImpl.java    |   2 +-
 13 files changed, 817 insertions(+), 701 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
index ba3b817519..f8125cfa82 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
 
 /**
  * Metrics related to Block Deleting Service running on Datanode.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index ff974b9cf4..3c3371ebf3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -326,4 +327,18 @@ public final class ContainerUtils {
           tarName);
     }
   }
+
+  public static long getPendingDeletionBlocks(ContainerData containerData) {
+    if (containerData.getContainerType()
+        .equals(ContainerProtos.ContainerType.KeyValueContainer)) {
+      return ((KeyValueContainerData) containerData)
+          .getNumPendingDeletionBlocks();
+    } else {
+      // If another ContainerType is available later, implement it
+      throw new IllegalArgumentException(
+          "getPendingDeletionBlocks for ContainerType: " +
+              containerData.getContainerType() +
+              " not support.");
+    }
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/BlockDeletingService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/BlockDeletingService.java
new file mode 100644
index 0000000000..87550d8656
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/BlockDeletingService.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import 
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingTask;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * A per-datanode container block deleting service takes in charge
+ * of deleting staled ozone blocks.
+ */
+public class BlockDeletingService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockDeletingService.class);
+
+  private final OzoneContainer ozoneContainer;
+  private final ContainerDeletionChoosingPolicy containerDeletionPolicy;
+  private final ConfigurationSource conf;
+
+  private final int blockLimitPerInterval;
+
+  private final BlockDeletingServiceMetrics metrics;
+
+  // Task priority is useful when a to-delete block has weight.
+  private static final int TASK_PRIORITY_DEFAULT = 1;
+
+  public BlockDeletingService(OzoneContainer ozoneContainer,
+                              long serviceInterval, long serviceTimeout,
+                              TimeUnit timeUnit, int workerSize,
+                              ConfigurationSource conf) {
+    super("BlockDeletingService", serviceInterval, timeUnit,
+        workerSize, serviceTimeout);
+    this.ozoneContainer = ozoneContainer;
+    try {
+      containerDeletionPolicy = conf.getClass(
+          ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
+          TopNOrderedContainerDeletionChoosingPolicy.class,
+          ContainerDeletionChoosingPolicy.class).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    this.conf = conf;
+    DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
+    this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
+    metrics = BlockDeletingServiceMetrics.create();
+  }
+
+  /**
+   * Pair of container data and the number of blocks to delete.
+   */
+  public static class ContainerBlockInfo {
+    private final ContainerData containerData;
+    private final Long numBlocksToDelete;
+
+    public ContainerBlockInfo(ContainerData containerData, Long blocks) {
+      this.containerData = containerData;
+      this.numBlocksToDelete = blocks;
+    }
+
+    public ContainerData getContainerData() {
+      return containerData;
+    }
+
+    public Long getNumBlocksToDelete() {
+      return numBlocksToDelete;
+    }
+
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    try {
+      // We at most list a number of containers a time,
+      // in case there are too many containers and start too many workers.
+      // We must ensure there is no empty container in this result.
+      // The chosen result depends on what container deletion policy is
+      // configured.
+      List<ContainerBlockInfo> containers =
+          chooseContainerForBlockDeletion(blockLimitPerInterval,
+              containerDeletionPolicy);
+
+      BackgroundTask
+          containerBlockInfos = null;
+      long totalBlocks = 0;
+      for (ContainerBlockInfo containerBlockInfo : containers) {
+        BlockDeletingTaskBuilder builder =
+            new BlockDeletingTaskBuilder();
+        builder.setBlockDeletingService(this)
+            .setContainerBlockInfo(containerBlockInfo)
+            .setPriority(TASK_PRIORITY_DEFAULT);
+        containerBlockInfos = builder.build();
+        queue.add(containerBlockInfos);
+        totalBlocks += containerBlockInfo.getNumBlocksToDelete();
+      }
+      metrics.incrTotalBlockChosenCount(totalBlocks);
+      metrics.incrTotalContainerChosenCount(containers.size());
+      if (containers.size() > 0) {
+        LOG.debug("Queued {} blocks from {} containers for deletion",
+            totalBlocks, containers.size());
+      }
+    } catch (StorageContainerException e) {
+      LOG.warn("Failed to initiate block deleting tasks, "
+          + "caused by unable to get containers info. "
+          + "Retry in next interval. ", e);
+    } catch (Exception e) {
+      // In case listContainer call throws any uncaught RuntimeException.
+      LOG.error("Unexpected error occurs during deleting blocks.", e);
+    }
+    return queue;
+  }
+
+  public List<ContainerBlockInfo> chooseContainerForBlockDeletion(
+      int blockLimit, ContainerDeletionChoosingPolicy deletionPolicy)
+      throws StorageContainerException {
+
+    AtomicLong totalPendingBlockCount = new AtomicLong(0L);
+    Map<Long, ContainerData> containerDataMap =
+        ozoneContainer.getContainerSet().getContainerMap().entrySet().stream()
+            .filter(e -> (checkPendingDeletionBlocks(
+                e.getValue().getContainerData())))
+            .filter(e -> isDeletionAllowed(e.getValue().getContainerData(),
+                deletionPolicy)).collect(Collectors
+            .toMap(Map.Entry::getKey, e -> {
+              ContainerData containerData =
+                  e.getValue().getContainerData();
+              totalPendingBlockCount
+                  .addAndGet(
+                      ContainerUtils.getPendingDeletionBlocks(containerData));
+              return containerData;
+            }));
+
+    metrics.setTotalPendingBlockCount(totalPendingBlockCount.get());
+    return deletionPolicy
+        .chooseContainerForBlockDeletion(blockLimit, containerDataMap);
+  }
+
+  private boolean checkPendingDeletionBlocks(ContainerData containerData) {
+    return ContainerUtils.getPendingDeletionBlocks(containerData) > 0;
+  }
+
+  private boolean isDeletionAllowed(ContainerData containerData,
+      ContainerDeletionChoosingPolicy deletionPolicy) {
+    if (!deletionPolicy
+        .isValidContainerType(containerData.getContainerType())) {
+      return false;
+    } else if (!containerData.isClosed()) {
+      return false;
+    } else {
+      if (ozoneContainer.getWriteChannel() instanceof XceiverServerRatis) {
+        XceiverServerRatis ratisServer =
+            (XceiverServerRatis) ozoneContainer.getWriteChannel();
+        final String originPipelineId = containerData.getOriginPipelineId();
+        if (originPipelineId == null || originPipelineId.isEmpty()) {
+          // In case the pipelineID is empty, just mark it for deletion.
+          // TODO: currently EC container goes through this path.
+          return true;
+        }
+        UUID pipelineUUID;
+        try {
+          pipelineUUID = UUID.fromString(originPipelineId);
+        } catch (IllegalArgumentException e) {
+          LOG.warn("Invalid pipelineID {} for container {}",
+              originPipelineId, containerData.getContainerID());
+          return false;
+        }
+        PipelineID pipelineID = PipelineID.valueOf(pipelineUUID);
+        // in case the ratis group does not exist, just mark it for deletion.
+        if (!ratisServer.isExist(pipelineID.getProtobuf())) {
+          return true;
+        }
+        try {
+          long minReplicatedIndex =
+              ratisServer.getMinReplicatedIndex(pipelineID);
+          long containerBCSID = containerData.getBlockCommitSequenceId();
+          if (minReplicatedIndex < containerBCSID) {
+            LOG.warn("Close Container log Index {} is not replicated across 
all"
+                    + " the servers in the pipeline {} as the min replicated "
+                    + "index is {}. Deletion is not allowed in this container "
+                    + "yet.", containerBCSID,
+                containerData.getOriginPipelineId(), minReplicatedIndex);
+            return false;
+          } else {
+            return true;
+          }
+        } catch (IOException ioe) {
+          // in case of any exception check again whether the pipeline exist
+          // and in case the pipeline got destroyed, just mark it for deletion
+          if (!ratisServer.isExist(pipelineID.getProtobuf())) {
+            return true;
+          } else {
+            LOG.info(ioe.getMessage());
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+  }
+
+  public OzoneContainer getOzoneContainer() {
+    return ozoneContainer;
+  }
+
+  public ConfigurationSource getConf() {
+    return conf;
+  }
+
+  public BlockDeletingServiceMetrics getMetrics() {
+    return metrics;
+  }
+
+  private static class BlockDeletingTaskBuilder {
+    private BlockDeletingService blockDeletingService;
+    private BlockDeletingService.ContainerBlockInfo containerBlockInfo;
+    private int priority;
+
+    public BlockDeletingTaskBuilder setBlockDeletingService(
+        BlockDeletingService blockDeletingService) {
+      this.blockDeletingService = blockDeletingService;
+      return this;
+    }
+
+    public BlockDeletingTaskBuilder setContainerBlockInfo(
+        ContainerBlockInfo containerBlockInfo) {
+      this.containerBlockInfo = containerBlockInfo;
+      return this;
+    }
+
+    public BlockDeletingTaskBuilder setPriority(int priority) {
+      this.priority = priority;
+      return this;
+    }
+
+    public BackgroundTask build() {
+      ContainerProtos.ContainerType containerType =
+          containerBlockInfo.getContainerData().getContainerType();
+      if (containerType
+          .equals(ContainerProtos.ContainerType.KeyValueContainer)) {
+        return
+            new BlockDeletingTask(blockDeletingService, containerBlockInfo,
+                priority);
+      } else {
+        // If another ContainerType is available later, implement it
+        throw new IllegalArgumentException(
+            "BlockDeletingTask for ContainerType: " + containerType +
+                "doesn't exist.");
+      }
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
index bd8db67690..90ff474827 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.container.common.impl;
 
 import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicyTemplate;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import java.util.Collections;
 import java.util.List;
@@ -31,7 +30,7 @@ public class RandomContainerDeletionChoosingPolicy
 
   @Override
   protected void orderByDescendingPriority(
-      List<KeyValueContainerData> candidateContainers) {
+      List<ContainerData> candidateContainers) {
     Collections.shuffle(candidateContainers);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
index e45dbbd3c9..4889d5597c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.ozone.container.common.impl;
 
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicyTemplate;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import java.util.Comparator;
 import java.util.List;
@@ -30,16 +30,15 @@ import java.util.List;
 public class TopNOrderedContainerDeletionChoosingPolicy
     extends ContainerDeletionChoosingPolicyTemplate {
   /** customized comparator used to compare differentiate container data. **/
-  private static final Comparator<KeyValueContainerData>
-        KEY_VALUE_CONTAINER_DATA_COMPARATOR = (KeyValueContainerData c1,
-                                               KeyValueContainerData c2) ->
-              Long.compare(c2.getNumPendingDeletionBlocks(),
-                  c1.getNumPendingDeletionBlocks());
+  private static final Comparator<ContainerData> CONTAINER_DATA_COMPARATOR =
+      (ContainerData c1, ContainerData c2) -> Long.compare(
+          ContainerUtils.getPendingDeletionBlocks(c2),
+          ContainerUtils.getPendingDeletionBlocks(c1));
 
   @Override
   protected void orderByDescendingPriority(
-      List<KeyValueContainerData> candidateContainers) {
+      List<ContainerData> candidateContainers) {
     // get top N list ordered by pending deletion blocks' number
-    candidateContainers.sort(KEY_VALUE_CONTAINER_DATA_COMPARATOR);
+    candidateContainers.sort(CONTAINER_DATA_COMPARATOR);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
index 884c6a1627..81f4b66d92 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
@@ -21,7 +21,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;
+import 
org.apache.hadoop.ozone.container.common.impl.BlockDeletingService.ContainerBlockInfo;
 
 import java.util.List;
 import java.util.Map;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
index d35cead5e4..c584ba7903 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.ozone.container.common.interfaces;
 
 import com.google.common.base.Preconditions;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;
+import 
org.apache.hadoop.ozone.container.common.impl.BlockDeletingService.ContainerBlockInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,10 +49,10 @@ public abstract class 
ContainerDeletionChoosingPolicyTemplate
 
     int originalBlockCount = blockCount;
     List<ContainerBlockInfo> result = new ArrayList<>();
-    List<KeyValueContainerData> orderedList = new LinkedList<>();
+    List<ContainerData> orderedList = new LinkedList<>();
 
     for (ContainerData entry: candidateContainers.values()) {
-      orderedList.add((KeyValueContainerData) entry);
+      orderedList.add(entry);
     }
 
     orderByDescendingPriority(orderedList);
@@ -64,16 +64,18 @@ public abstract class 
ContainerDeletionChoosingPolicyTemplate
     // container but with container we also return an integer so that total
     // blocks don't exceed the number of blocks to be deleted in an interval.
 
-    for (KeyValueContainerData entry : orderedList) {
-      if (entry.getNumPendingDeletionBlocks() > 0) {
-        long numBlocksToDelete = Math.min(blockCount,
-            entry.getNumPendingDeletionBlocks());
+    for (ContainerData entry : orderedList) {
+      long pendingDeletionBlocks =
+          ContainerUtils.getPendingDeletionBlocks(entry);
+
+      if (pendingDeletionBlocks > 0) {
+        long numBlocksToDelete = Math.min(blockCount, pendingDeletionBlocks);
         blockCount -= numBlocksToDelete;
         result.add(new ContainerBlockInfo(entry, numBlocksToDelete));
         if (LOG.isDebugEnabled()) {
           LOG.debug("Select container {} for block deletion, "
               + "pending deletion blocks num: {}.", entry.getContainerID(),
-              entry.getNumPendingDeletionBlocks());
+              pendingDeletionBlocks);
         }
         if (blockCount == 0) {
           break;
@@ -92,5 +94,5 @@ public abstract class ContainerDeletionChoosingPolicyTemplate
    * @param candidateContainers candidate containers to be ordered
    */
   protected abstract void orderByDescendingPriority(
-      List<KeyValueContainerData> candidateContainers);
+      List<ContainerData> candidateContainers);
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
deleted file mode 100644
index c7bdff0256..0000000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ /dev/null
@@ -1,672 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-import java.util.LinkedList;
-import java.util.Objects;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
-import org.apache.hadoop.hdds.utils.BackgroundService;
-import org.apache.hadoop.hdds.utils.BackgroundTask;
-import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import 
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import 
org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import org.apache.hadoop.ozone.container.metadata.DeleteTransactionStore;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
-
-import com.google.common.collect.Lists;
-
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
-
-import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A per-datanode container block deleting service takes in charge
- * of deleting staled ozone blocks.
- */
-
-public class BlockDeletingService extends BackgroundService {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BlockDeletingService.class);
-
-  private OzoneContainer ozoneContainer;
-  private ContainerDeletionChoosingPolicy containerDeletionPolicy;
-  private final ConfigurationSource conf;
-
-  private final int blockLimitPerInterval;
-
-  private final BlockDeletingServiceMetrics metrics;
-
-  // Task priority is useful when a to-delete block has weight.
-  private static final int TASK_PRIORITY_DEFAULT = 1;
-
-  public BlockDeletingService(OzoneContainer ozoneContainer,
-                              long serviceInterval, long serviceTimeout,
-                              TimeUnit timeUnit, int workerSize,
-                              ConfigurationSource conf) {
-    super("BlockDeletingService", serviceInterval, timeUnit,
-        workerSize, serviceTimeout);
-    this.ozoneContainer = ozoneContainer;
-    try {
-      containerDeletionPolicy = conf.getClass(
-          ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
-          TopNOrderedContainerDeletionChoosingPolicy.class,
-          ContainerDeletionChoosingPolicy.class).newInstance();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    this.conf = conf;
-    DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
-    this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
-    metrics = BlockDeletingServiceMetrics.create();
-  }
-
-  /**
-   * Pair of container data and the number of blocks to delete.
-   */
-  public static class ContainerBlockInfo {
-    private final ContainerData containerData;
-    private final Long numBlocksToDelete;
-
-    public ContainerBlockInfo(ContainerData containerData, Long blocks) {
-      this.containerData = containerData;
-      this.numBlocksToDelete = blocks;
-    }
-
-    public ContainerData getContainerData() {
-      return containerData;
-    }
-
-    public Long getBlocks() {
-      return numBlocksToDelete;
-    }
-
-  }
-
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    List<ContainerBlockInfo> containers = Lists.newArrayList();
-    try {
-      // We at most list a number of containers a time,
-      // in case there are too many containers and start too many workers.
-      // We must ensure there is no empty container in this result.
-      // The chosen result depends on what container deletion policy is
-      // configured.
-      containers = chooseContainerForBlockDeletion(blockLimitPerInterval,
-          containerDeletionPolicy);
-
-      BlockDeletingTask containerBlockInfos = null;
-      long totalBlocks = 0;
-      for (ContainerBlockInfo containerBlockInfo : containers) {
-        containerBlockInfos =
-            new BlockDeletingTask(containerBlockInfo.containerData,
-                TASK_PRIORITY_DEFAULT, containerBlockInfo.numBlocksToDelete);
-        queue.add(containerBlockInfos);
-        totalBlocks += containerBlockInfo.numBlocksToDelete;
-      }
-      metrics.incrTotalBlockChosenCount(totalBlocks);
-      metrics.incrTotalContainerChosenCount(containers.size());
-      if (containers.size() > 0) {
-        LOG.debug("Queued {} blocks from {} containers for deletion",
-            totalBlocks, containers.size());
-      }
-    } catch (StorageContainerException e) {
-      LOG.warn("Failed to initiate block deleting tasks, "
-          + "caused by unable to get containers info. "
-          + "Retry in next interval. ", e);
-    } catch (Exception e) {
-      // In case listContainer call throws any uncaught RuntimeException.
-      LOG.error("Unexpected error occurs during deleting blocks.", e);
-    }
-    return queue;
-  }
-
-  public List<ContainerBlockInfo> chooseContainerForBlockDeletion(
-      int blockLimit, ContainerDeletionChoosingPolicy deletionPolicy)
-      throws StorageContainerException {
-    Map<Long, ContainerData> containerDataMap =
-        ozoneContainer.getContainerSet().getContainerMap().entrySet().stream()
-            .filter(e -> ((KeyValueContainerData) e.getValue()
-                .getContainerData()).getNumPendingDeletionBlocks() > 0)
-            .filter(e -> isDeletionAllowed(e.getValue().getContainerData(),
-                deletionPolicy)).collect(Collectors
-            .toMap(Map.Entry::getKey, e -> e.getValue().getContainerData()));
-
-    long totalPendingBlockCount =
-        containerDataMap.values().stream().mapToLong(
-            containerData -> ((KeyValueContainerData) containerData)
-            .getNumPendingDeletionBlocks())
-        .sum();
-    metrics.setTotalPendingBlockCount(totalPendingBlockCount);
-    return deletionPolicy
-        .chooseContainerForBlockDeletion(blockLimit, containerDataMap);
-  }
-
-  private boolean isDeletionAllowed(ContainerData containerData,
-      ContainerDeletionChoosingPolicy deletionPolicy) {
-    if (!deletionPolicy
-        .isValidContainerType(containerData.getContainerType())) {
-      return false;
-    } else if (!containerData.isClosed()) {
-      return false;
-    } else {
-      if (ozoneContainer.getWriteChannel() instanceof XceiverServerRatis) {
-        XceiverServerRatis ratisServer =
-            (XceiverServerRatis) ozoneContainer.getWriteChannel();
-        final String originPipelineId = containerData.getOriginPipelineId();
-        if (originPipelineId == null || originPipelineId.isEmpty()) {
-          // In case the pipelineID is empty, just mark it for deletion.
-          // TODO: currently EC container goes through this path.
-          return true;
-        }
-        UUID pipelineUUID;
-        try {
-          pipelineUUID = UUID.fromString(originPipelineId);
-        } catch (IllegalArgumentException e) {
-          LOG.warn("Invalid pipelineID {} for container {}",
-              originPipelineId, containerData.getContainerID());
-          return false;
-        }
-        PipelineID pipelineID = PipelineID.valueOf(pipelineUUID);
-        // in case the ratis group does not exist, just mark it for deletion.
-        if (!ratisServer.isExist(pipelineID.getProtobuf())) {
-          return true;
-        }
-        try {
-          long minReplicatedIndex =
-              ratisServer.getMinReplicatedIndex(pipelineID);
-          long containerBCSID = containerData.getBlockCommitSequenceId();
-          if (minReplicatedIndex < containerBCSID) {
-            LOG.warn("Close Container log Index {} is not replicated across 
all"
-                    + " the servers in the pipeline {} as the min replicated "
-                    + "index is {}. Deletion is not allowed in this container "
-                    + "yet.", containerBCSID,
-                containerData.getOriginPipelineId(), minReplicatedIndex);
-            return false;
-          } else {
-            return true;
-          }
-        } catch (IOException ioe) {
-          // in case of any exception check again whether the pipeline exist
-          // and in case the pipeline got destroyed, just mark it for deletion
-          if (!ratisServer.isExist(pipelineID.getProtobuf())) {
-            return true;
-          } else {
-            LOG.info(ioe.getMessage());
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-  }
-
-  private static class ContainerBackgroundTaskResult
-      implements BackgroundTaskResult {
-    private List<String> deletedBlockIds;
-
-    ContainerBackgroundTaskResult() {
-      deletedBlockIds = new LinkedList<>();
-    }
-
-    public void addBlockId(String blockId) {
-      deletedBlockIds.add(blockId);
-    }
-
-    public void addAll(List<String> blockIds) {
-      deletedBlockIds.addAll(blockIds);
-    }
-
-    public List<String> getDeletedBlocks() {
-      return deletedBlockIds;
-    }
-
-    @Override
-    public int getSize() {
-      return deletedBlockIds.size();
-    }
-  }
-
-  private class BlockDeletingTask implements BackgroundTask {
-
-    private final int priority;
-    private final KeyValueContainerData containerData;
-    private final long blocksToDelete;
-
-    BlockDeletingTask(ContainerData containerName, int priority,
-        long blocksToDelete) {
-      this.priority = priority;
-      this.containerData = (KeyValueContainerData) containerName;
-      this.blocksToDelete = blocksToDelete;
-    }
-
-    @Override
-    public BackgroundTaskResult call() throws Exception {
-      ContainerBackgroundTaskResult crr;
-      final Container container = ozoneContainer.getContainerSet()
-          .getContainer(containerData.getContainerID());
-      container.writeLock();
-      File dataDir = new File(containerData.getChunksPath());
-      long startTime = Time.monotonicNow();
-      // Scan container's db and get list of under deletion blocks
-      try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
-        if (containerData.hasSchema(SCHEMA_V1)) {
-          crr = deleteViaSchema1(meta, container, dataDir, startTime);
-        } else if (containerData.hasSchema(SCHEMA_V2)) {
-          crr = deleteViaSchema2(meta, container, dataDir, startTime);
-        } else if (containerData.hasSchema(SCHEMA_V3)) {
-          crr = deleteViaSchema3(meta, container, dataDir, startTime);
-        } else {
-          throw new UnsupportedOperationException(
-              "Only schema version 1,2,3 are supported.");
-        }
-        return crr;
-      } finally {
-        container.writeUnlock();
-      }
-    }
-
-    public boolean checkDataDir(File dataDir) {
-      boolean b = true;
-      if (!dataDir.exists() || !dataDir.isDirectory()) {
-        LOG.error("Invalid container data dir {} : "
-            + "does not exist or not a directory", dataDir.getAbsolutePath());
-        b = false;
-      }
-      return b;
-    }
-
-    public ContainerBackgroundTaskResult deleteViaSchema1(
-        DBHandle meta, Container container, File dataDir,
-        long startTime) throws IOException {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
-      if (!checkDataDir(dataDir)) {
-        return crr;
-      }
-      try {
-        Table<String, BlockData> blockDataTable =
-            meta.getStore().getBlockDataTable();
-
-        // # of blocks to delete is throttled
-        KeyPrefixFilter filter = containerData.getDeletingBlockKeyFilter();
-        List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
-            blockDataTable
-                .getSequentialRangeKVs(containerData.startKeyEmpty(),
-                    (int) blocksToDelete, containerData.containerPrefix(),
-                    filter);
-        if (toDeleteBlocks.isEmpty()) {
-          LOG.debug("No under deletion block found in container : {}",
-              containerData.getContainerID());
-          return crr;
-        }
-
-        List<String> succeedBlocks = new LinkedList<>();
-        LOG.debug("Container : {}, To-Delete blocks : {}",
-            containerData.getContainerID(), toDeleteBlocks.size());
-
-        Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
-            .getHandler(container.getContainerType()));
-
-        long releasedBytes = 0;
-        for (Table.KeyValue<String, BlockData> entry: toDeleteBlocks) {
-          String blockName = entry.getKey();
-          LOG.debug("Deleting block {}", blockName);
-          if (entry.getValue() == null) {
-            LOG.warn("Missing delete block(Container = " +
-                container.getContainerData().getContainerID() + ", Block = " +
-                blockName);
-            continue;
-          }
-          try {
-            handler.deleteBlock(container, entry.getValue());
-            releasedBytes += KeyValueContainerUtil.getBlockLength(
-                entry.getValue());
-            succeedBlocks.add(blockName);
-          } catch (InvalidProtocolBufferException e) {
-            LOG.error("Failed to parse block info for block {}", blockName, e);
-          } catch (IOException e) {
-            LOG.error("Failed to delete files for block {}", blockName, e);
-          }
-        }
-
-        // Once chunks in the blocks are deleted... remove the blockID from
-        // blockDataTable.
-        try (BatchOperation batch = meta.getStore().getBatchHandler()
-            .initBatchOperation()) {
-          for (String entry : succeedBlocks) {
-            blockDataTable.deleteWithBatch(batch, entry);
-          }
-
-          // Handler.deleteBlock calls deleteChunk to delete all the chunks
-          // in the block. The ContainerData stats (DB and in-memory) are not
-          // updated with decremented used bytes during deleteChunk. This is
-          // done here so that all the DB update for block delete can be
-          // batched together while committing to DB.
-          int deletedBlocksCount = succeedBlocks.size();
-          containerData.updateAndCommitDBCounters(meta, batch,
-              deletedBlocksCount, releasedBytes);
-          // Once DB update is persisted, check if there are any blocks
-          // remaining in the DB. This will determine whether the container
-          // can be deleted by SCM.
-          if (!container.hasBlocks()) {
-            containerData.markAsEmpty();
-          }
-
-          // update count of pending deletion blocks, block count and used
-          // bytes in in-memory container status.
-          containerData.decrPendingDeletionBlocks(deletedBlocksCount);
-          containerData.decrBlockCount(deletedBlocksCount);
-          containerData.decrBytesUsed(releasedBytes);
-          containerData.getVolume().decrementUsedSpace(releasedBytes);
-          metrics.incrSuccessCount(deletedBlocksCount);
-          metrics.incrSuccessBytes(releasedBytes);
-        }
-
-        if (!succeedBlocks.isEmpty()) {
-          LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, " 
+
-                  "task elapsed time: {}ms", containerData.getContainerID(),
-              succeedBlocks.size(), releasedBytes,
-              Time.monotonicNow() - startTime);
-        }
-        crr.addAll(succeedBlocks);
-        return crr;
-      } catch (IOException exception) {
-        LOG.warn("Deletion operation was not successful for container: " +
-            container.getContainerData().getContainerID(), exception);
-        metrics.incrFailureCount();
-        throw exception;
-      }
-    }
-
-    public ContainerBackgroundTaskResult deleteViaSchema2(
-        DBHandle meta, Container container, File dataDir,
-        long startTime) throws IOException {
-      Deleter schema2Deleter = (table, batch, tid) -> {
-        Table<Long, DeletedBlocksTransaction> delTxTable =
-            (Table<Long, DeletedBlocksTransaction>) table;
-        delTxTable.deleteWithBatch(batch, tid);
-      };
-      Table<Long, DeletedBlocksTransaction> deleteTxns =
-          ((DeleteTransactionStore<Long>) meta.getStore())
-              .getDeleteTransactionTable();
-      try (TableIterator<Long,
-          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
-          iterator = deleteTxns.iterator()) {
-        return deleteViaTransactionStore(
-            iterator, meta,
-            container, dataDir, startTime, schema2Deleter);
-      }
-    }
-
-    public ContainerBackgroundTaskResult deleteViaSchema3(
-        DBHandle meta, Container container, File dataDir,
-        long startTime) throws IOException {
-      Deleter schema3Deleter = (table, batch, tid) -> {
-        Table<String, DeletedBlocksTransaction> delTxTable =
-            (Table<String, DeletedBlocksTransaction>) table;
-        delTxTable.deleteWithBatch(batch,
-            containerData.getDeleteTxnKey(tid));
-      };
-      Table<String, DeletedBlocksTransaction> deleteTxns =
-          ((DeleteTransactionStore<String>) meta.getStore())
-              .getDeleteTransactionTable();
-      try (TableIterator<String,
-          ? extends Table.KeyValue<String, DeletedBlocksTransaction>>
-          iterator = deleteTxns.iterator(containerData.containerPrefix())) {
-        return deleteViaTransactionStore(
-            iterator, meta,
-            container, dataDir, startTime, schema3Deleter);
-      }
-    }
-
-    private ContainerBackgroundTaskResult deleteViaTransactionStore(
-        TableIterator<?, ? extends Table.KeyValue<?, DeletedBlocksTransaction>>
-            iter, DBHandle meta, Container container, File dataDir,
-        long startTime, Deleter deleter) throws IOException {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
-      if (!checkDataDir(dataDir)) {
-        return crr;
-      }
-      try {
-        Table<String, BlockData> blockDataTable =
-            meta.getStore().getBlockDataTable();
-        DeleteTransactionStore<?> txnStore =
-            (DeleteTransactionStore<?>) meta.getStore();
-        Table<?, DeletedBlocksTransaction> deleteTxns =
-            txnStore.getDeleteTransactionTable();
-        List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
-        int numBlocks = 0;
-        while (iter.hasNext() && (numBlocks < blocksToDelete)) {
-          DeletedBlocksTransaction delTx = iter.next().getValue();
-          numBlocks += delTx.getLocalIDList().size();
-          delBlocks.add(delTx);
-        }
-        if (delBlocks.isEmpty()) {
-          LOG.info("No transaction found in container {} with pending delete " 
+
-                  "block count {}",
-              containerData.getContainerID(),
-              containerData.getNumPendingDeletionBlocks());
-          // If the container was queued for delete, it had a positive
-          // pending delete block count. After checking the DB there were
-          // actually no delete transactions for the container, so reset the
-          // pending delete block count to the correct value of zero.
-          containerData.resetPendingDeleteBlockCount(meta);
-          return crr;
-        }
-
-        LOG.debug("Container : {}, To-Delete blocks : {}",
-            containerData.getContainerID(), delBlocks.size());
-
-        Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
-            .getHandler(container.getContainerType()));
-
-        DeleteTransactionStats deleteBlocksResult =
-            deleteTransactions(delBlocks, handler, blockDataTable, container);
-        int deletedBlocksProcessed = deleteBlocksResult.getBlocksProcessed();
-        int deletedBlocksCount = deleteBlocksResult.getBlocksDeleted();
-        long releasedBytes = deleteBlocksResult.getBytesReleased();
-
-        // Once blocks are deleted... remove the blockID from blockDataTable
-        // and also remove the transactions from txnTable.
-        try (BatchOperation batch = meta.getStore().getBatchHandler()
-            .initBatchOperation()) {
-          for (DeletedBlocksTransaction delTx : delBlocks) {
-            deleter.apply(deleteTxns, batch, delTx.getTxID());
-            for (Long blk : delTx.getLocalIDList()) {
-              blockDataTable.deleteWithBatch(batch,
-                  containerData.getBlockKey(blk));
-            }
-          }
-
-          // Handler.deleteBlock calls deleteChunk to delete all the chunks
-          // in the block. The ContainerData stats (DB and in-memory) are not
-          // updated with decremented used bytes during deleteChunk. This is
-          // done here so that all the DB updates for block delete can be
-          // batched together while committing to DB.
-          containerData.updateAndCommitDBCounters(meta, batch,
-              deletedBlocksCount, releasedBytes);
-          // Once DB update is persisted, check if there are any blocks
-          // remaining in the DB. This will determine whether the container
-          // can be deleted by SCM.
-          if (!container.hasBlocks()) {
-            containerData.markAsEmpty();
-          }
-
-          // update count of pending deletion blocks, block count and used
-          // bytes in in-memory container status and used space in volume.
-          containerData.decrPendingDeletionBlocks(deletedBlocksProcessed);
-          containerData.decrBlockCount(deletedBlocksCount);
-          containerData.decrBytesUsed(releasedBytes);
-          containerData.getVolume().decrementUsedSpace(releasedBytes);
-          metrics.incrSuccessCount(deletedBlocksCount);
-          metrics.incrSuccessBytes(releasedBytes);
-        }
-
-        LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, " +
-                "task elapsed time: {}ms", containerData.getContainerID(),
-            deletedBlocksCount, releasedBytes, Time.monotonicNow() - 
startTime);
-
-        return crr;
-      } catch (IOException exception) {
-        LOG.warn("Deletion operation was not successful for container: " +
-            container.getContainerData().getContainerID(), exception);
-        metrics.incrFailureCount();
-        throw exception;
-      }
-    }
-
-    /**
-     * Delete the chunks for the given blocks.
-     * Return the deletedBlocks count and number of bytes released.
-     */
-    private DeleteTransactionStats deleteTransactions(
-        List<DeletedBlocksTransaction> delBlocks, Handler handler,
-        Table<String, BlockData> blockDataTable, Container container)
-        throws IOException {
-      int blocksProcessed = 0;
-      int blocksDeleted = 0;
-      long bytesReleased = 0;
-      for (DeletedBlocksTransaction entry : delBlocks) {
-        for (Long blkLong : entry.getLocalIDList()) {
-          String blk = containerData.getBlockKey(blkLong);
-          BlockData blkInfo = blockDataTable.get(blk);
-          LOG.debug("Deleting block {}", blkLong);
-          if (blkInfo == null) {
-            try {
-              handler.deleteUnreferenced(container, blkLong);
-            } catch (IOException e) {
-              LOG.error("Failed to delete files for unreferenced block {} of" +
-                      " container {}", blkLong,
-                  container.getContainerData().getContainerID(), e);
-            } finally {
-              blocksProcessed++;
-            }
-            continue;
-          }
-
-          boolean deleted = false;
-          try {
-            handler.deleteBlock(container, blkInfo);
-            blocksDeleted++;
-            deleted = true;
-          } catch (IOException e) {
-            // TODO: if deletion of certain block retries exceed the certain
-            //  number of times, service should skip deleting it,
-            //  otherwise invalid numPendingDeletionBlocks could accumulate
-            //  beyond the limit and the following deletion will stop.
-            LOG.error("Failed to delete files for block {}", blkLong, e);
-          } finally {
-            blocksProcessed++;
-          }
-
-          if (deleted) {
-            try {
-              bytesReleased += KeyValueContainerUtil.getBlockLength(blkInfo);
-            } catch (IOException e) {
-              // TODO: handle the bytesReleased correctly for the unexpected
-              //  exception.
-              LOG.error("Failed to get block length for block {}", blkLong, e);
-            }
-          }
-        }
-      }
-      return new DeleteTransactionStats(blocksProcessed,
-          blocksDeleted, bytesReleased);
-    }
-
-    @Override
-    public int getPriority() {
-      return priority;
-    }
-  }
-
-  private interface Deleter {
-    void apply(Table<?, DeletedBlocksTransaction> deleteTxnsTable,
-        BatchOperation batch, long txnID) throws IOException;
-  }
-
-  public BlockDeletingServiceMetrics getMetrics() {
-    return metrics;
-  }
-
-  /**
-   * The wrapper class of the result of deleting transactions.
-   */
-  private static class DeleteTransactionStats {
-
-    private final int blocksProcessed;
-    private final int blocksDeleted;
-    private final long bytesReleased;
-
-    DeleteTransactionStats(int proceeded, int deleted, long released) {
-      blocksProcessed =  proceeded;
-      blocksDeleted = deleted;
-      bytesReleased = released;
-    }
-
-    public int getBlocksProcessed() {
-      return blocksProcessed;
-    }
-
-    public int getBlocksDeleted() {
-      return blocksDeleted;
-    }
-
-    public long getBytesReleased() {
-      return bytesReleased;
-    }
-  }
-}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
new file mode 100644
index 0000000000..f2cca28bd8
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import 
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.metadata.DeleteTransactionStore;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
+
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockDeletingTask for KeyValueContainer.
+ */
+public class BlockDeletingTask implements BackgroundTask {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockDeletingTask.class);
+
+  private final BlockDeletingServiceMetrics metrics;
+  private final int priority;
+  private final KeyValueContainerData containerData;
+  private final long blocksToDelete;
+  private final OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  public BlockDeletingTask(
+      BlockDeletingService blockDeletingService,
+      BlockDeletingService.ContainerBlockInfo containerBlockInfo,
+      int priority) {
+    this.ozoneContainer = blockDeletingService.getOzoneContainer();
+    this.metrics = blockDeletingService.getMetrics();
+    this.conf = blockDeletingService.getConf();
+    this.priority = priority;
+    this.containerData =
+        (KeyValueContainerData) containerBlockInfo.getContainerData();
+    this.blocksToDelete = containerBlockInfo.getNumBlocksToDelete();
+  }
+
+  private static class ContainerBackgroundTaskResult
+      implements BackgroundTaskResult {
+    private List<String> deletedBlockIds;
+
+    ContainerBackgroundTaskResult() {
+      deletedBlockIds = new LinkedList<>();
+    }
+
+    public void addBlockId(String blockId) {
+      deletedBlockIds.add(blockId);
+    }
+
+    public void addAll(List<String> blockIds) {
+      deletedBlockIds.addAll(blockIds);
+    }
+
+    public List<String> getDeletedBlocks() {
+      return deletedBlockIds;
+    }
+
+    @Override
+    public int getSize() {
+      return deletedBlockIds.size();
+    }
+  }
+
+
+  @Override
+  public BackgroundTaskResult call() throws Exception {
+    ContainerBackgroundTaskResult crr;
+    final Container container = ozoneContainer.getContainerSet()
+        .getContainer(containerData.getContainerID());
+    container.writeLock();
+    File dataDir = new File(containerData.getChunksPath());
+    long startTime = Time.monotonicNow();
+    // Scan container's db and get list of under deletion blocks
+    try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
+      if (containerData.hasSchema(SCHEMA_V1)) {
+        crr = deleteViaSchema1(meta, container, dataDir, startTime);
+      } else if (containerData.hasSchema(SCHEMA_V2)) {
+        crr = deleteViaSchema2(meta, container, dataDir, startTime);
+      } else if (containerData.hasSchema(SCHEMA_V3)) {
+        crr = deleteViaSchema3(meta, container, dataDir, startTime);
+      } else {
+        throw new UnsupportedOperationException(
+            "Only schema version 1,2,3 are supported.");
+      }
+      return crr;
+    } finally {
+      container.writeUnlock();
+    }
+  }
+
+  public boolean checkDataDir(File dataDir) {
+    boolean b = true;
+    if (!dataDir.exists() || !dataDir.isDirectory()) {
+      LOG.error("Invalid container data dir {} : "
+          + "does not exist or not a directory", dataDir.getAbsolutePath());
+      b = false;
+    }
+    return b;
+  }
+
+  public ContainerBackgroundTaskResult deleteViaSchema1(
+      DBHandle meta, Container container, File dataDir,
+      long startTime) throws IOException {
+    ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+    if (!checkDataDir(dataDir)) {
+      return crr;
+    }
+    try {
+      Table<String, BlockData> blockDataTable =
+          meta.getStore().getBlockDataTable();
+
+      // # of blocks to delete is throttled
+      KeyPrefixFilter filter = containerData.getDeletingBlockKeyFilter();
+      List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
+          blockDataTable
+              .getSequentialRangeKVs(containerData.startKeyEmpty(),
+                  (int) blocksToDelete, containerData.containerPrefix(),
+                  filter);
+      if (toDeleteBlocks.isEmpty()) {
+        LOG.debug("No under deletion block found in container : {}",
+            containerData.getContainerID());
+        return crr;
+      }
+
+      List<String> succeedBlocks = new LinkedList<>();
+      LOG.debug("Container : {}, To-Delete blocks : {}",
+          containerData.getContainerID(), toDeleteBlocks.size());
+
+      Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
+          .getHandler(container.getContainerType()));
+
+      long releasedBytes = 0;
+      for (Table.KeyValue<String, BlockData> entry : toDeleteBlocks) {
+        String blockName = entry.getKey();
+        LOG.debug("Deleting block {}", blockName);
+        if (entry.getValue() == null) {
+          LOG.warn("Missing delete block(Container = " +
+              container.getContainerData().getContainerID() +
+              ", Block = " + blockName);
+          continue;
+        }
+        try {
+          handler.deleteBlock(container, entry.getValue());
+          releasedBytes += KeyValueContainerUtil.getBlockLength(
+              entry.getValue());
+          succeedBlocks.add(blockName);
+        } catch (InvalidProtocolBufferException e) {
+          LOG.error("Failed to parse block info for block {}", blockName, e);
+        } catch (IOException e) {
+          LOG.error("Failed to delete files for block {}", blockName, e);
+        }
+      }
+
+      // Once chunks in the blocks are deleted... remove the blockID from
+      // blockDataTable.
+      try (BatchOperation batch = meta.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        for (String entry : succeedBlocks) {
+          blockDataTable.deleteWithBatch(batch, entry);
+        }
+
+        // Handler.deleteBlock calls deleteChunk to delete all the chunks
+        // in the block. The ContainerData stats (DB and in-memory) are not
+        // updated with decremented used bytes during deleteChunk. This is
+        // done here so that all the DB update for block delete can be
+        // batched together while committing to DB.
+        int deletedBlocksCount = succeedBlocks.size();
+        containerData.updateAndCommitDBCounters(meta, batch,
+            deletedBlocksCount, releasedBytes);
+        // Once DB update is persisted, check if there are any blocks
+        // remaining in the DB. This will determine whether the container
+        // can be deleted by SCM.
+        if (!container.hasBlocks()) {
+          containerData.markAsEmpty();
+        }
+
+        // update count of pending deletion blocks, block count and used
+        // bytes in in-memory container status.
+        containerData.decrPendingDeletionBlocks(deletedBlocksCount);
+        containerData.decrBlockCount(deletedBlocksCount);
+        containerData.decrBytesUsed(releasedBytes);
+        containerData.getVolume().decrementUsedSpace(releasedBytes);
+        metrics.incrSuccessCount(deletedBlocksCount);
+        metrics.incrSuccessBytes(releasedBytes);
+      }
+
+      if (!succeedBlocks.isEmpty()) {
+        LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, " +
+                "task elapsed time: {}ms", containerData.getContainerID(),
+            succeedBlocks.size(), releasedBytes,
+            Time.monotonicNow() - startTime);
+      }
+      crr.addAll(succeedBlocks);
+      return crr;
+    } catch (IOException exception) {
+      LOG.warn("Deletion operation was not successful for container: " +
+          container.getContainerData().getContainerID(), exception);
+      metrics.incrFailureCount();
+      throw exception;
+    }
+  }
+
+  public ContainerBackgroundTaskResult deleteViaSchema2(
+      DBHandle meta, Container container, File dataDir,
+      long startTime) throws IOException {
+    Deleter schema2Deleter = (table, batch, tid) -> {
+      Table<Long, DeletedBlocksTransaction> delTxTable =
+          (Table<Long, DeletedBlocksTransaction>) table;
+      delTxTable.deleteWithBatch(batch, tid);
+    };
+    Table<Long, DeletedBlocksTransaction> deleteTxns =
+        ((DeleteTransactionStore<Long>) meta.getStore())
+            .getDeleteTransactionTable();
+    try (TableIterator<Long,
+        ? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
+             iterator = deleteTxns.iterator()) {
+      return deleteViaTransactionStore(
+          iterator, meta,
+          container, dataDir, startTime, schema2Deleter);
+    }
+  }
+
+  public ContainerBackgroundTaskResult deleteViaSchema3(
+      DBHandle meta, Container container, File dataDir,
+      long startTime) throws IOException {
+    Deleter schema3Deleter = (table, batch, tid) -> {
+      Table<String, DeletedBlocksTransaction> delTxTable =
+          (Table<String, DeletedBlocksTransaction>) table;
+      delTxTable.deleteWithBatch(batch,
+          containerData.getDeleteTxnKey(tid));
+    };
+    Table<String, DeletedBlocksTransaction> deleteTxns =
+        ((DeleteTransactionStore<String>) meta.getStore())
+            .getDeleteTransactionTable();
+    try (TableIterator<String,
+        ? extends Table.KeyValue<String, DeletedBlocksTransaction>>
+             iterator = deleteTxns.iterator(containerData.containerPrefix())) {
+      return deleteViaTransactionStore(
+          iterator, meta,
+          container, dataDir, startTime, schema3Deleter);
+    }
+  }
+
+  private ContainerBackgroundTaskResult deleteViaTransactionStore(
+      TableIterator<?, ? extends Table.KeyValue<?, DeletedBlocksTransaction>>
+          iter, DBHandle meta, Container container, File dataDir,
+      long startTime, Deleter deleter) throws IOException {
+    ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+    if (!checkDataDir(dataDir)) {
+      return crr;
+    }
+    try {
+      Table<String, BlockData> blockDataTable =
+          meta.getStore().getBlockDataTable();
+      DeleteTransactionStore<?> txnStore =
+          (DeleteTransactionStore<?>) meta.getStore();
+      Table<?, DeletedBlocksTransaction> deleteTxns =
+          txnStore.getDeleteTransactionTable();
+      List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
+      int numBlocks = 0;
+      while (iter.hasNext() && (numBlocks < blocksToDelete)) {
+        DeletedBlocksTransaction delTx = iter.next().getValue();
+        numBlocks += delTx.getLocalIDList().size();
+        delBlocks.add(delTx);
+      }
+      if (delBlocks.isEmpty()) {
+        LOG.info("No transaction found in container {} with pending delete " +
+                "block count {}",
+            containerData.getContainerID(),
+            containerData.getNumPendingDeletionBlocks());
+        // If the container was queued for delete, it had a positive
+        // pending delete block count. After checking the DB there were
+        // actually no delete transactions for the container, so reset the
+        // pending delete block count to the correct value of zero.
+        containerData.resetPendingDeleteBlockCount(meta);
+        return crr;
+      }
+
+      LOG.debug("Container : {}, To-Delete blocks : {}",
+          containerData.getContainerID(), delBlocks.size());
+
+      Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
+          .getHandler(container.getContainerType()));
+
+      DeleteTransactionStats deleteBlocksResult =
+          deleteTransactions(delBlocks, handler, blockDataTable, container);
+      int deletedBlocksProcessed = deleteBlocksResult.getBlocksProcessed();
+      int deletedBlocksCount = deleteBlocksResult.getBlocksDeleted();
+      long releasedBytes = deleteBlocksResult.getBytesReleased();
+
+      // Once blocks are deleted... remove the blockID from blockDataTable
+      // and also remove the transactions from txnTable.
+      try (BatchOperation batch = meta.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        for (DeletedBlocksTransaction delTx : delBlocks) {
+          deleter.apply(deleteTxns, batch, delTx.getTxID());
+          for (Long blk : delTx.getLocalIDList()) {
+            blockDataTable.deleteWithBatch(batch,
+                containerData.getBlockKey(blk));
+          }
+        }
+
+        // Handler.deleteBlock calls deleteChunk to delete all the chunks
+        // in the block. The ContainerData stats (DB and in-memory) are not
+        // updated with decremented used bytes during deleteChunk. This is
+        // done here so that all the DB updates for block delete can be
+        // batched together while committing to DB.
+        containerData.updateAndCommitDBCounters(meta, batch,
+            deletedBlocksCount, releasedBytes);
+        // Once DB update is persisted, check if there are any blocks
+        // remaining in the DB. This will determine whether the container
+        // can be deleted by SCM.
+        if (!container.hasBlocks()) {
+          containerData.markAsEmpty();
+        }
+
+        // update count of pending deletion blocks, block count and used
+        // bytes in in-memory container status and used space in volume.
+        containerData.decrPendingDeletionBlocks(deletedBlocksProcessed);
+        containerData.decrBlockCount(deletedBlocksCount);
+        containerData.decrBytesUsed(releasedBytes);
+        containerData.getVolume().decrementUsedSpace(releasedBytes);
+        metrics.incrSuccessCount(deletedBlocksCount);
+        metrics.incrSuccessBytes(releasedBytes);
+      }
+
+      LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, " +
+              "task elapsed time: {}ms", containerData.getContainerID(),
+          deletedBlocksCount, releasedBytes, Time.monotonicNow() - startTime);
+
+      return crr;
+    } catch (IOException exception) {
+      LOG.warn("Deletion operation was not successful for container: " +
+          container.getContainerData().getContainerID(), exception);
+      metrics.incrFailureCount();
+      throw exception;
+    }
+  }
+
+  /**
+   * Delete the chunks for the given blocks.
+   * Return the deletedBlocks count and number of bytes released.
+   */
+  private DeleteTransactionStats deleteTransactions(
+      List<DeletedBlocksTransaction> delBlocks, Handler handler,
+      Table<String, BlockData> blockDataTable, Container container)
+      throws IOException {
+    int blocksProcessed = 0;
+    int blocksDeleted = 0;
+    long bytesReleased = 0;
+    for (DeletedBlocksTransaction entry : delBlocks) {
+      for (Long blkLong : entry.getLocalIDList()) {
+        String blk = containerData.getBlockKey(blkLong);
+        BlockData blkInfo = blockDataTable.get(blk);
+        LOG.debug("Deleting block {}", blkLong);
+        if (blkInfo == null) {
+          try {
+            handler.deleteUnreferenced(container, blkLong);
+          } catch (IOException e) {
+            LOG.error("Failed to delete files for unreferenced block {} of" +
+                    " container {}", blkLong,
+                container.getContainerData().getContainerID(), e);
+          } finally {
+            blocksProcessed++;
+          }
+          continue;
+        }
+
+        boolean deleted = false;
+        try {
+          handler.deleteBlock(container, blkInfo);
+          blocksDeleted++;
+          deleted = true;
+        } catch (IOException e) {
+          // TODO: if deletion of certain block retries exceed the certain
+          //  number of times, service should skip deleting it,
+          //  otherwise invalid numPendingDeletionBlocks could accumulate
+          //  beyond the limit and the following deletion will stop.
+          LOG.error("Failed to delete files for block {}", blkLong, e);
+        } finally {
+          blocksProcessed++;
+        }
+
+        if (deleted) {
+          try {
+            bytesReleased += KeyValueContainerUtil.getBlockLength(blkInfo);
+          } catch (IOException e) {
+            // TODO: handle the bytesReleased correctly for the unexpected
+            //  exception.
+            LOG.error("Failed to get block length for block {}", blkLong, e);
+          }
+        }
+      }
+    }
+    return new DeleteTransactionStats(blocksProcessed,
+        blocksDeleted, bytesReleased);
+  }
+
+  @Override
+  public int getPriority() {
+    return priority;
+  }
+
+  private interface Deleter {
+    void apply(Table<?, DeletedBlocksTransaction> deleteTxnsTable,
+               BatchOperation batch, long txnID) throws IOException;
+  }
+
+  /**
+   * The wrapper class of the result of deleting transactions.
+   */
+  private static class DeleteTransactionStats {
+
+    private final int blocksProcessed;
+    private final int blocksDeleted;
+    private final long bytesReleased;
+
+    DeleteTransactionStats(int proceeded, int deleted, long released) {
+      blocksProcessed = proceeded;
+      blocksDeleted = deleted;
+      bytesReleased = released;
+    }
+
+    public int getBlocksProcessed() {
+      return blocksProcessed;
+    }
+
+    public int getBlocksDeleted() {
+      return blocksDeleted;
+    }
+
+    public long getBytesReleased() {
+      return bytesReleased;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 29ac6ea240..50c406d35e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
@@ -53,7 +54,6 @@ import 
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
 import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 2e49cd8882..bed5ac40f1 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -62,7 +62,7 @@ import 
org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
index cb86bf38e7..67338f860d 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
@@ -38,8 +38,7 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoo
 import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;
+import 
org.apache.hadoop.ozone.container.common.impl.BlockDeletingService.ContainerBlockInfo;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
@@ -120,7 +119,7 @@ public class TestContainerDeletionChoosingPolicy {
 
     long totPendingBlocks = 0;
     for (ContainerBlockInfo pr : result0) {
-      totPendingBlocks += pr.getBlocks();
+      totPendingBlocks += pr.getNumBlocksToDelete();
     }
     Assert.assertTrue(totPendingBlocks >= blockLimitPerInterval);
 
@@ -192,7 +191,7 @@ public class TestContainerDeletionChoosingPolicy {
         .chooseContainerForBlockDeletion(blockLimitPerInterval, 
deletionPolicy);
     long totPendingBlocks = 0;
     for (ContainerBlockInfo pr : result0) {
-      totPendingBlocks += pr.getBlocks();
+      totPendingBlocks += pr.getNumBlocksToDelete();
     }
     Assert.assertTrue(totPendingBlocks >= blockLimitPerInterval);
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
index ee2bcfee6d..fdf4dd0d57 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 
 import com.google.common.annotations.VisibleForTesting;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to