This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new 0baba47daa HDDS-11254. Reconcile commands should be handled by 
datanode ReplicationSupervisor (#7076)
0baba47daa is described below

commit 0baba47daaded042ba3cc4b2e7a342b5d53279dc
Author: Ethan Rose <[email protected]>
AuthorDate: Wed Aug 21 13:34:57 2024 -0400

    HDDS-11254. Reconcile commands should be handled by datanode 
ReplicationSupervisor (#7076)
---
 .../checksum/ContainerChecksumTreeManager.java     |   5 +-
 .../checksum/ContainerMerkleTreeMetrics.java       |   1 -
 .../container/checksum/ReconcileContainerTask.java |  88 ++++++++++++++
 .../ozone/container/common/interfaces/Handler.java |  12 +-
 .../common/statemachine/DatanodeStateMachine.java  |   7 +-
 .../ReconcileContainerCommandHandler.java          |  61 +++-------
 .../ozone/container/keyvalue/KeyValueHandler.java  |  10 +-
 .../container/ozoneimpl/ContainerController.java   |   7 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   9 +-
 .../commands/ReconcileContainerCommand.java        |  34 ++++--
 .../checksum/TestReconcileContainerTask.java       | 126 +++++++++++++++++++++
 .../ozone/container/common/ContainerTestUtils.java |  48 ++++++++
 .../container/common/TestBlockDeletingService.java |  34 ++----
 .../TestSchemaOneBackwardsCompatibility.java       |  11 +-
 .../TestSchemaTwoBackwardsCompatibility.java       |   4 +-
 .../common/impl/TestContainerPersistence.java      |   3 +-
 .../container/common/impl/TestHddsDispatcher.java  |  44 +------
 .../container/common/interfaces/TestHandler.java   |  18 +--
 .../common/statemachine/TestStateContext.java      |   2 +-
 .../TestReconcileContainerCommandHandler.java      |  84 ++++----------
 .../states/endpoint/TestHeartbeatEndpointTask.java |   4 +-
 .../container/keyvalue/TestKeyValueHandler.java    |  28 ++---
 .../TestKeyValueHandlerWithUnhealthyContainer.java |   3 +-
 .../replication/TestGrpcReplicationService.java    |   8 +-
 .../ReconcileContainerEventHandler.java            |   5 +-
 .../container/metrics/TestContainerMetrics.java    |  15 +--
 .../container/server/TestContainerServer.java      |  15 +--
 .../server/TestSecureContainerServer.java          |  16 +--
 .../ozone/debug/container/ContainerCommands.java   |   5 +-
 .../ozone/freon/ClosedContainerReplicator.java     |   5 +-
 30 files changed, 416 insertions(+), 296 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
index 7042531f57..3e18e16c40 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -60,10 +60,13 @@ public class ContainerChecksumTreeManager {
   public ContainerChecksumTreeManager(ConfigurationSource conf) {
     fileLock = SimpleStriped.readWriteLock(
         
conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(), 
true);
-    // TODO: TO unregister metrics on stop.
     metrics = ContainerMerkleTreeMetrics.create();
   }
 
+  public void stop() {
+    ContainerMerkleTreeMetrics.unregister();
+  }
+
   /**
    * Writes the specified container merkle tree to the specified container's 
checksum file.
    * The data merkle tree within the file is replaced with the {@code tree} 
parameter, but all other content of the
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
index 3d76288616..c1bab5aa48 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
@@ -32,7 +32,6 @@ public class ContainerMerkleTreeMetrics {
 
   public static ContainerMerkleTreeMetrics create() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    // TODO: Remove when checksum manager is moved from KeyValueHandler.
     MetricsSource source = ms.getSource(METRICS_SOURCE_NAME);
     if (source != null) {
       ms.unregisterSource(METRICS_SOURCE_NAME);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java
new file mode 100644
index 0000000000..ac42efd45a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * Used to execute a container reconciliation task that has been queued from 
the ReplicationSupervisor.
+ */
+public class ReconcileContainerTask extends AbstractReplicationTask {
+  private final ReconcileContainerCommand command;
+  private final DNContainerOperationClient dnClient;
+  private final ContainerController controller;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReconcileContainerTask.class);
+
+  public ReconcileContainerTask(ContainerController controller,
+      DNContainerOperationClient dnClient, ReconcileContainerCommand command) {
+    super(command.getContainerID(), command.getDeadline(), command.getTerm());
+    this.command = command;
+    this.controller = controller;
+    this.dnClient = dnClient;
+  }
+
+  @Override
+  public void runTask() {
+    long start = Time.monotonicNow();
+
+    LOG.info("{}", this);
+
+    try {
+      controller.reconcileContainer(dnClient, command.getContainerID(), 
command.getPeerDatanodes());
+      setStatus(Status.DONE);
+      long elapsed = Time.monotonicNow() - start;
+      LOG.info("{} completed in {} ms", this, elapsed);
+    } catch (Exception e) {
+      long elapsed = Time.monotonicNow() - start;
+      setStatus(Status.FAILED);
+      LOG.warn("{} failed in {} ms", this, elapsed, e);
+    }
+  }
+
+  @Override
+  protected Object getCommandForDebug() {
+    return command.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ReconcileContainerTask that = (ReconcileContainerTask) o;
+    return Objects.equals(command, that.command);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getContainerId());
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 01435d8002..1579f4af8e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.ozone.container.common.interfaces;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -30,6 +30,8 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -70,16 +72,17 @@ public abstract class Handler {
     this.icrSender = icrSender;
   }
 
+  @SuppressWarnings("checkstyle:ParameterNumber")
   public static Handler getHandlerForContainerType(
       final ContainerType containerType, final ConfigurationSource config,
       final String datanodeId, final ContainerSet contSet,
       final VolumeSet volumeSet, final ContainerMetrics metrics,
-      IncrementalReportSender<Container> icrSender) {
+      IncrementalReportSender<Container> icrSender, 
ContainerChecksumTreeManager checksumManager) {
     switch (containerType) {
     case KeyValueContainer:
       return new KeyValueHandler(config,
           datanodeId, contSet, volumeSet, metrics,
-          icrSender);
+          icrSender, checksumManager);
     default:
       throw new IllegalArgumentException("Handler for ContainerType: " +
           containerType + "doesn't exist.");
@@ -199,7 +202,8 @@ public abstract class Handler {
    * @param container container to be reconciled.
    * @param peers The other datanodes with a copy of this container whose data 
should be checked.
    */
-  public abstract void reconcileContainer(Container<?> container, 
List<DatanodeDetails> peers) throws IOException;
+  public abstract void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
+      Set<DatanodeDetails> peers) throws IOException;
 
   /**
    * Deletes the given files associated with a block of the container.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a8deb9823d..888d1fd018 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -45,6 +45,7 @@ import 
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.NettyMetrics;
 import org.apache.hadoop.ozone.HddsDatanodeStopService;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
 import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
@@ -225,6 +226,10 @@ public class DatanodeStateMachine implements Closeable {
         new ReconstructECContainersCommandHandler(conf, supervisor,
             ecReconstructionCoordinator);
 
+    // TODO HDDS-11218 combine the clients used for reconstruction and 
reconciliation so they share the same cache of
+    //  datanode clients.
+    DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
certClient, secretKeyClient);
+
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
         .build();
@@ -253,7 +258,7 @@ public class DatanodeStateMachine implements Closeable {
             supervisor::nodeStateUpdated))
         .addHandler(new FinalizeNewLayoutVersionCommandHandler())
         .addHandler(new RefreshVolumeUsageCommandHandler())
-        .addHandler(new ReconcileContainerCommandHandler(threadNamePrefix))
+        .addHandler(new ReconcileContainerCommandHandler(supervisor, dnClient))
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
index 9a4110c7df..99185a7e10 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
@@ -18,66 +18,38 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask;
 import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Handles commands from SCM to reconcile a container replica on this datanode 
with the replicas on its peers.
  */
 public class ReconcileContainerCommandHandler implements CommandHandler {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ReconcileContainerCommandHandler.class);
-
+  private final ReplicationSupervisor supervisor;
   private final AtomicLong invocationCount;
-  private final AtomicInteger queuedCount;
-  private final ExecutorService executor;
-  private long totalTime;
+  private final DNContainerOperationClient dnClient;
 
-  public ReconcileContainerCommandHandler(String threadNamePrefix) {
-    invocationCount = new AtomicLong(0);
-    queuedCount = new AtomicInteger(0);
-    // TODO Allow configurable thread pool size with a default value when the 
implementation is ready.
-    executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d")
-        .build());
-    totalTime = 0;
+  public ReconcileContainerCommandHandler(ReplicationSupervisor supervisor, 
DNContainerOperationClient dnClient) {
+    this.supervisor = supervisor;
+    this.dnClient = dnClient;
+    this.invocationCount = new AtomicLong(0);
   }
 
   @Override
   public void handle(SCMCommand command, OzoneContainer container, 
StateContext context,
       SCMConnectionManager connectionManager) {
-    queuedCount.incrementAndGet();
-    CompletableFuture.runAsync(() -> {
-      invocationCount.incrementAndGet();
-      long startTime = Time.monotonicNow();
-      ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) 
command;
-      LOG.info("Processing reconcile container command for container {} with 
peers {}",
-          reconcileCommand.getContainerID(), 
reconcileCommand.getPeerDatanodes());
-      try {
-        
container.getController().reconcileContainer(reconcileCommand.getContainerID(),
-            reconcileCommand.getPeerDatanodes());
-      } catch (IOException ex) {
-        LOG.error("Failed to reconcile container {}.", 
reconcileCommand.getContainerID(), ex);
-      } finally {
-        long endTime = Time.monotonicNow();
-        totalTime += endTime - startTime;
-      }
-    }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+    invocationCount.incrementAndGet();
+    ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) 
command;
+    supervisor.addTask(new ReconcileContainerTask(container.getController(), 
dnClient, reconcileCommand));
   }
 
   @Override
@@ -90,21 +62,20 @@ public class ReconcileContainerCommandHandler implements 
CommandHandler {
     return (int)invocationCount.get();
   }
 
+  // Uses ReplicationSupervisor for these metrics.
+
   @Override
   public long getAverageRunTime() {
-    if (invocationCount.get() > 0) {
-      return totalTime / invocationCount.get();
-    }
     return 0;
   }
 
   @Override
   public long getTotalRunTime() {
-    return totalTime;
+    return 0;
   }
 
   @Override
   public int getQueuedCount() {
-    return queuedCount.get();
+    return 0;
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 9b888a9c1e..4b635194cd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Function;
 
@@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -159,14 +161,15 @@ public class KeyValueHandler extends Handler {
                          ContainerSet contSet,
                          VolumeSet volSet,
                          ContainerMetrics metrics,
-                         IncrementalReportSender<Container> icrSender) {
+                         IncrementalReportSender<Container> icrSender,
+                         ContainerChecksumTreeManager checksumManager) {
     super(config, datanodeId, contSet, volSet, metrics, icrSender);
     blockManager = new BlockManagerImpl(config);
     validateChunkChecksumData = conf.getObject(
         DatanodeConfiguration.class).isChunkDataValidationCheck();
     chunkManager = ChunkManagerFactory.createChunkManager(config, blockManager,
         volSet);
-    checksumManager = new ContainerChecksumTreeManager(config);
+    this.checksumManager = checksumManager;
     try {
       volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
     } catch (Exception e) {
@@ -1303,7 +1306,8 @@ public class KeyValueHandler extends Handler {
   }
 
   @Override
-  public void reconcileContainer(Container<?> container, List<DatanodeDetails> 
peers) throws IOException {
+  public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
+                                 Set<DatanodeDetails> peers) throws 
IOException {
     // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
     ContainerData data = container.getContainerData();
     long id = data.getContainerID();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 47b503ee05..6a1ceef0c0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -39,7 +40,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.time.Instant;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -215,12 +215,13 @@ public class ContainerController {
     }
   }
 
-  public void reconcileContainer(long containerID, List<DatanodeDetails> 
peers) throws IOException {
+  public void reconcileContainer(DNContainerOperationClient dnClient, long 
containerID, Set<DatanodeDetails> peers)
+      throws IOException {
     Container<?> container = containerSet.getContainer(containerID);
     if (container == null) {
       LOG.warn("Container {} to reconcile not found on this datanode.", 
containerID);
     } else {
-      getHandler(container).reconcileContainer(container, peers);
+      getHandler(container).reconcileContainer(dnClient, container, peers);
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 6a6ac8bb35..f8034244c1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -34,7 +34,6 @@ import 
org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
-import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeMetrics;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
@@ -188,12 +187,13 @@ public class OzoneContainer {
       }
     };
 
+    checksumTreeManager = new ContainerChecksumTreeManager(config);
     for (ContainerType containerType : ContainerType.values()) {
       handlers.put(containerType,
           Handler.getHandlerForContainerType(
               containerType, conf,
               context.getParent().getDatanodeDetails().getUuidString(),
-              containerSet, volumeSet, metrics, icrSender));
+              containerSet, volumeSet, metrics, icrSender, 
checksumTreeManager));
     }
 
     SecurityConfig secConf = new SecurityConfig(conf);
@@ -226,8 +226,6 @@ public class OzoneContainer {
     Duration blockDeletingSvcInterval = conf.getObject(
         DatanodeConfiguration.class).getBlockDeletionInterval();
 
-    checksumTreeManager = new ContainerChecksumTreeManager(config);
-
     long blockDeletingServiceTimeout = config
         .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
             OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
@@ -500,8 +498,7 @@ public class OzoneContainer {
     blockDeletingService.shutdown();
     recoveringContainerScrubbingService.shutdown();
     ContainerMetrics.remove();
-    // TODO: To properly shut down ContainerMerkleTreeMetrics
-    ContainerMerkleTreeMetrics.unregister();
+    checksumTreeManager.stop();
   }
 
   public void handleVolumeFailures() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
index cdd4522cc6..3d24756a40 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
@@ -25,18 +25,20 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto;
 
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
 
 /**
  * Asks datanodes to reconcile the specified container with other container 
replicas.
  */
 public class ReconcileContainerCommand extends 
SCMCommand<ReconcileContainerCommandProto> {
 
-  private final List<DatanodeDetails> peerDatanodes;
+  private final Set<DatanodeDetails> peerDatanodes;
 
-  public ReconcileContainerCommand(long containerID, List<DatanodeDetails> 
peerDatanodes) {
+  public ReconcileContainerCommand(long containerID, Set<DatanodeDetails> 
peerDatanodes) {
     // Container ID serves as command ID, since only one reconciliation should 
be in progress at a time.
     super(containerID);
     this.peerDatanodes = peerDatanodes;
@@ -58,7 +60,7 @@ public class ReconcileContainerCommand extends 
SCMCommand<ReconcileContainerComm
     return builder.build();
   }
 
-  public List<DatanodeDetails> getPeerDatanodes() {
+  public Set<DatanodeDetails> getPeerDatanodes() {
     return peerDatanodes;
   }
 
@@ -70,11 +72,11 @@ public class ReconcileContainerCommand extends 
SCMCommand<ReconcileContainerComm
     Preconditions.checkNotNull(protoMessage);
 
     List<HddsProtos.DatanodeDetailsProto> peers = protoMessage.getPeersList();
-    List<DatanodeDetails> peerNodes = !peers.isEmpty()
+    Set<DatanodeDetails> peerNodes = !peers.isEmpty()
         ? peers.stream()
         .map(DatanodeDetails::getFromProtoBuf)
-        .collect(Collectors.toList())
-        : emptyList();
+        .collect(Collectors.toSet())
+        : emptySet();
 
     return new ReconcileContainerCommand(protoMessage.getContainerID(), 
peerNodes);
   }
@@ -85,4 +87,22 @@ public class ReconcileContainerCommand extends 
SCMCommand<ReconcileContainerComm
         ": containerId=" + getContainerID() +
         ", peerNodes=" + peerDatanodes;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ReconcileContainerCommand that = (ReconcileContainerCommand) o;
+    return getContainerID() == that.getContainerID() &&
+        Objects.equals(peerDatanodes, that.peerDatanodes);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getContainerID(), peerDatanodes);
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java
new file mode 100644
index 0000000000..04d08347ed
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+class TestReconcileContainerTask {
+  private DNContainerOperationClient mockClient;
+  private ContainerController mockController;
+
+  @BeforeEach
+  public void init() {
+    mockClient = mock(DNContainerOperationClient.class);
+    mockController = mock(ContainerController.class);
+  }
+
+  @Test
+  public void testFailedTaskStatus() throws Exception {
+    doThrow(IOException.class).when(mockController).reconcileContainer(any(), 
anyLong(), any());
+    ReconcileContainerTask task = new ReconcileContainerTask(mockController, 
mockClient,
+        new ReconcileContainerCommand(1, Collections.emptySet()));
+
+    assertEquals(AbstractReplicationTask.Status.QUEUED, task.getStatus());
+    task.runTask();
+    assertEquals(AbstractReplicationTask.Status.FAILED, task.getStatus());
+  }
+
+  @Test
+  public void testSuccessfulTaskStatus() {
+    ReconcileContainerTask task = new ReconcileContainerTask(mockController, 
mockClient,
+        new ReconcileContainerCommand(1, Collections.emptySet()));
+
+    assertEquals(AbstractReplicationTask.Status.QUEUED, task.getStatus());
+    task.runTask();
+    assertEquals(AbstractReplicationTask.Status.DONE, task.getStatus());
+  }
+
+  @Test
+  public void testEqualityWhenContainerIDsMatch() {
+    final long containerID = 1;
+    final UUID dnID1 = UUID.randomUUID();
+
+    Set<DatanodeDetails> peerSet1 = new HashSet<>();
+    peerSet1.add(buildDn(dnID1));
+    Set<DatanodeDetails> peerSet1Other = new HashSet<>();
+    peerSet1Other.add(buildDn(dnID1));
+    Set<DatanodeDetails> peerSet2 = new HashSet<>();
+    peerSet2.add(buildDn());
+
+    ReconcileContainerTask peerSet1Task = new 
ReconcileContainerTask(mockController, mockClient,
+        new ReconcileContainerCommand(containerID, peerSet1));
+    ReconcileContainerTask otherPeerSet1Task = new 
ReconcileContainerTask(mockController, mockClient,
+        new ReconcileContainerCommand(containerID, peerSet1Other));
+    ReconcileContainerTask peerSet2Task = new 
ReconcileContainerTask(mockController, mockClient,
+        new ReconcileContainerCommand(containerID, peerSet2));
+
+    // Same container ID and peers.
+    assertEquals(peerSet1Task, otherPeerSet1Task);
+    // Same container ID, different peers.
+    assertNotEquals(peerSet1Task, peerSet2Task);
+  }
+
+  @Test
+  public void testEqualityWhenContainerIDsDifferent() {
+    Set<DatanodeDetails> peerSet = new HashSet<>();
+    peerSet.add(buildDn());
+
+    ReconcileContainerTask id1Task = new 
ReconcileContainerTask(mockController, mockClient,
+        new ReconcileContainerCommand(1, peerSet));
+    ReconcileContainerTask id2Task = new 
ReconcileContainerTask(mockController, mockClient,
+        new ReconcileContainerCommand(2, peerSet));
+    ReconcileContainerTask id2NoPeersTask = new 
ReconcileContainerTask(mockController, mockClient,
+        new ReconcileContainerCommand(2, Collections.emptySet()));
+
+    // Different container ID, same peers.
+    assertNotEquals(id1Task, id2Task);
+    // Different container ID, different peers.
+    assertNotEquals(id1Task, id2NoPeersTask);
+  }
+
+  private DatanodeDetails buildDn(UUID id) {
+    return DatanodeDetails.newBuilder()
+        .setUuid(id)
+        .build();
+  }
+
+  private DatanodeDetails buildDn() {
+    return DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID())
+        .build();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index b5b578554b..d4c00df38b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.common;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -27,6 +28,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -36,9 +38,12 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -54,8 +59,10 @@ import 
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -73,6 +80,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -179,6 +187,46 @@ public final class ContainerTestUtils {
     return new KeyValueContainer(kvData, new OzoneConfiguration());
   }
 
+  /**
+   * Constructs an instance of KeyValueHandler that can be used for testing.
+   * This instance can be used for tests that do not need an ICR sender or 
{@link ContainerChecksumTreeManager}.
+   */
+  public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
+      String datanodeId, ContainerSet contSet, VolumeSet volSet, 
ContainerMetrics metrics) {
+    return new KeyValueHandler(config, datanodeId, contSet, volSet, metrics, c 
-> { },
+        new ContainerChecksumTreeManager(config));
+  }
+
+  /**
+   * Constructs an instance of KeyValueHandler that can be used for testing.
+   * This instance can be used for tests that do not need an ICR sender, 
metrics, or a
+   * {@link ContainerChecksumTreeManager}.
+   */
+  public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
+      String datanodeId, ContainerSet contSet, VolumeSet volSet) {
+    return getKeyValueHandler(config, datanodeId, contSet, volSet, 
ContainerMetrics.create(config));
+  }
+
+  public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf,
+                                                 ContainerSet contSet,
+                                                 VolumeSet volSet,
+                                                 StateContext context) {
+    return getHddsDispatcher(conf, contSet, volSet, context, null);
+  }
+
+  public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf,
+                                                 ContainerSet contSet,
+                                                 VolumeSet volSet,
+                                                 StateContext context, 
TokenVerifier verifier) {
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+    Map<ContainerType, Handler> handlers = Maps.newHashMap();
+    handlers.put(ContainerType.KeyValueContainer, 
ContainerTestUtils.getKeyValueHandler(conf,
+        context.getParent().getDatanodeDetails().getUuidString(), contSet, 
volSet, metrics));
+    assertEquals(1, ContainerType.values().length, "Tests only cover 
KeyValueContainer type");
+    return new HddsDispatcher(
+        conf, contSet, volSet, handlers, context, metrics, verifier);
+  }
+
   public static void enableSchemaV3(OzoneConfiguration conf) {
     DatanodeConfiguration dc = conf.getObject(DatanodeConfiguration.class);
     dc.setContainerSchemaV3Enabled(true);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index ab313d0ce6..fd0c81fb02 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -481,9 +481,7 @@ public class TestBlockDeletingService {
     // runs so we can trigger it manually.
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet, metrics);
     OzoneContainer ozoneContainer =
         mockDependencies(containerSet, keyValueHandler);
     BlockDeletingService svc = new BlockDeletingService(ozoneContainer,
@@ -550,9 +548,7 @@ public class TestBlockDeletingService {
     createToDeleteBlocks(containerSet, 1, 3, 1);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet, metrics);
     BlockDeletingServiceTestImpl svc =
         getBlockDeletingService(containerSet, conf, keyValueHandler);
     svc.start();
@@ -684,9 +680,7 @@ public class TestBlockDeletingService {
 
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet, metrics);
     BlockDeletingServiceTestImpl svc =
         getBlockDeletingService(containerSet, conf, keyValueHandler);
     svc.start();
@@ -792,9 +786,7 @@ public class TestBlockDeletingService {
     createToDeleteBlocks(containerSet, 1, 100, 1);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet, metrics);
     BlockDeletingServiceTestImpl service =
         getBlockDeletingService(containerSet, conf, keyValueHandler);
     service.start();
@@ -822,9 +814,7 @@ public class TestBlockDeletingService {
     createToDeleteBlocks(containerSet, 1, 3, 1);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet, metrics);
     // set timeout value as 1ns to trigger timeout behavior
     long timeout  = 1;
     OzoneContainer ozoneContainer =
@@ -929,9 +919,7 @@ public class TestBlockDeletingService {
         chunksPerBlock);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet, metrics);
     BlockDeletingServiceTestImpl service =
         getBlockDeletingService(containerSet, conf, keyValueHandler);
     service.start();
@@ -988,9 +976,7 @@ public class TestBlockDeletingService {
     createToDeleteBlocks(containerSet, containerCount, blocksPerContainer,
         chunksPerBlock);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            ContainerMetrics.create(conf), c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet);
     BlockDeletingServiceTestImpl service =
         getBlockDeletingService(containerSet, conf, keyValueHandler);
     service.start();
@@ -1047,9 +1033,7 @@ public class TestBlockDeletingService {
     ContainerSet containerSet = new ContainerSet(1000);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet, metrics);
     int containerCount = 5;
     int blocksPerContainer = 3;
     createToDeleteBlocks(containerSet, containerCount,
@@ -1119,7 +1103,7 @@ public class TestBlockDeletingService {
     ContainerSet containerSet = new ContainerSet(1000);
     KeyValueContainerData contData = createToDeleteBlocks(containerSet, 
numBlocks, 4);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, 
ContainerMetrics.create(conf), c -> { });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet);
     BlockDeletingServiceTestImpl svc =
         getBlockDeletingService(containerSet, conf, keyValueHandler);
     svc.start();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index ad5ca48218..7774604127 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
@@ -279,11 +278,8 @@ public class TestSchemaOneBackwardsCompatibility {
     ContainerSet containerSet = makeContainerSet();
     VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet);
     long initialTotalSpace = newKvData().getBytesUsed();
     long blockSpace = initialTotalSpace / TestDB.KEY_COUNT;
 
@@ -352,11 +348,8 @@ public class TestSchemaOneBackwardsCompatibility {
     ContainerSet containerSet = makeContainerSet();
     VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
-        new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
-            metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, 
containerSet, volumeSet);
     KeyValueContainerData cData = newKvData();
     try (DBHandle refCountedDB = BlockUtils.getDB(cData, conf)) {
       // Read blocks that were already deleted before the upgrade.
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
index 0c4612b79f..da0d2384ab 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
@@ -135,8 +134,7 @@ public class TestSchemaTwoBackwardsCompatibility {
     chunkManager = new FilePerBlockStrategy(true, blockManager, volumeSet);
 
     containerSet = new ContainerSet(1000);
-    keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
-        containerSet, volumeSet, ContainerMetrics.create(conf), c -> { });
+    keyValueHandler = ContainerTestUtils.getKeyValueHandler(conf, 
datanodeUuid, containerSet, volumeSet);
     ozoneContainer = mock(OzoneContainer.class);
     when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
     when(ozoneContainer.getWriteChannel()).thenReturn(null);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 3ff8f9e625..993179d1b7 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -311,7 +312,7 @@ public class TestContainerPersistence {
 
     KeyValueHandler kvHandler = new KeyValueHandler(conf,
         datanodeId, containerSet, volumeSet, metrics,
-        c -> icrReceived.incrementAndGet());
+        c -> icrReceived.incrementAndGet(), new 
ContainerChecksumTreeManager(conf));
 
     Exception exception = assertThrows(
         StorageContainerException.class,
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 1cbd6ee470..05bebdd1b9 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.container.common.impl;
 
-import com.google.common.collect.Maps;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.StorageUnit;
@@ -34,7 +33,6 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -48,8 +46,6 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
@@ -78,7 +74,6 @@ import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -113,10 +108,6 @@ public class TestHddsDispatcher {
   @TempDir
   private File testDir;
 
-  public static final IncrementalReportSender<Container> NO_OP_ICR_SENDER =
-      c -> {
-      };
-
   @ContainerLayoutTestInfo.ContainerTest
   public void testContainerCloseActionWhenFull(
       ContainerLayoutVersion layout) throws IOException {
@@ -143,16 +134,7 @@ public class TestHddsDispatcher {
       container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
           scmId.toString());
       containerSet.addContainer(container);
-      ContainerMetrics metrics = ContainerMetrics.create(conf);
-      Map<ContainerType, Handler> handlers = Maps.newHashMap();
-      for (ContainerType containerType : ContainerType.values()) {
-        handlers.put(containerType,
-            Handler.getHandlerForContainerType(containerType, conf,
-                context.getParent().getDatanodeDetails().getUuidString(),
-                containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
-      }
-      HddsDispatcher hddsDispatcher = new HddsDispatcher(
-          conf, containerSet, volumeSet, handlers, context, metrics, null);
+      HddsDispatcher hddsDispatcher = 
ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context);
       hddsDispatcher.setClusterId(scmId.toString());
       ContainerCommandResponseProto responseOne = hddsDispatcher
           .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
@@ -279,16 +261,7 @@ public class TestHddsDispatcher {
       container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
           scmId.toString());
       containerSet.addContainer(container);
-      ContainerMetrics metrics = ContainerMetrics.create(conf);
-      Map<ContainerType, Handler> handlers = Maps.newHashMap();
-      for (ContainerType containerType : ContainerType.values()) {
-        handlers.put(containerType,
-            Handler.getHandlerForContainerType(containerType, conf,
-                context.getParent().getDatanodeDetails().getUuidString(),
-                containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
-      }
-      HddsDispatcher hddsDispatcher = new HddsDispatcher(
-          conf, containerSet, volumeSet, handlers, context, metrics, null);
+      HddsDispatcher hddsDispatcher = 
ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context);
       hddsDispatcher.setClusterId(scmId.toString());
       containerData.getVolume().getVolumeInfo()
           .ifPresent(volumeInfo -> volumeInfo.incrementUsedSpace(50));
@@ -528,17 +501,8 @@ public class TestHddsDispatcher {
       }
     });
     StateContext context = ContainerTestUtils.getMockContext(dd, conf);
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
-    Map<ContainerType, Handler> handlers = Maps.newHashMap();
-    for (ContainerType containerType : ContainerType.values()) {
-      handlers.put(containerType,
-          Handler.getHandlerForContainerType(containerType, conf,
-              context.getParent().getDatanodeDetails().getUuidString(),
-              containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
-    }
-
-    final HddsDispatcher hddsDispatcher = new HddsDispatcher(conf,
-        containerSet, volumeSet, handlers, context, metrics, tokenVerifier);
+    final HddsDispatcher hddsDispatcher =
+        ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, 
context, tokenVerifier);
     hddsDispatcher.setClusterId(scmId.toString());
     return hddsDispatcher;
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
index 8f2ad307e8..27257d5a0e 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.ozone.container.common.interfaces;
 
-import java.util.Map;
-
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -27,13 +25,11 @@ import 
org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
-import org.apache.hadoop.ozone.container.common.impl.TestHddsDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 
-import com.google.common.collect.Maps;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -62,19 +58,7 @@ public class TestHandler {
     DatanodeDetails datanodeDetails = mock(DatanodeDetails.class);
     StateContext context = ContainerTestUtils.getMockContext(
         datanodeDetails, conf);
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
-    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
-    for (ContainerProtos.ContainerType containerType :
-        ContainerProtos.ContainerType.values()) {
-      handlers.put(containerType,
-          Handler.getHandlerForContainerType(
-              containerType, conf,
-              context.getParent().getDatanodeDetails().getUuidString(),
-              containerSet, volumeSet, metrics,
-              TestHddsDispatcher.NO_OP_ICR_SENDER));
-    }
-    this.dispatcher = new HddsDispatcher(
-        conf, containerSet, volumeSet, handlers, null, metrics, null);
+    this.dispatcher = ContainerTestUtils.getHddsDispatcher(conf, containerSet, 
volumeSet, context);
   }
 
   @AfterEach
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 6933400fba..fd4614335e 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -705,7 +705,7 @@ public class TestStateContext {
     ctx.addCommand(ReplicateContainerCommand.forTest(3));
     ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
     ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId()));
-    ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptyList()));
+    ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptySet()));
 
     Map<SCMCommandProto.Type, Integer> summary = ctx.getCommandQueueSummary();
     assertEquals(3,
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
index d6be667f41..f27ed097d2 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
@@ -24,6 +24,9 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -39,16 +42,14 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
-import org.apache.ozone.test.GenericTestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import static java.util.Collections.singletonMap;
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -56,7 +57,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.GB;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -78,7 +80,14 @@ public class TestReconcileContainerCommandHandler {
 
     OzoneConfiguration conf = new OzoneConfiguration();
     DatanodeDetails dnDetails = randomDatanodeDetails();
-    subject = new ReconcileContainerCommandHandler("");
+
+    ReplicationSupervisor mockSupervisor = mock(ReplicationSupervisor.class);
+    doAnswer(invocation -> {
+      ((ReconcileContainerTask)invocation.getArguments()[0]).runTask();
+      return null;
+    }).when(mockSupervisor).addTask(any());
+
+    subject = new ReconcileContainerCommandHandler(mockSupervisor, 
mock(DNContainerOperationClient.class));
     context = ContainerTestUtils.getMockContext(dnDetails, conf);
 
     containerSet = new ContainerSet(1000);
@@ -91,7 +100,7 @@ public class TestReconcileContainerCommandHandler {
     assertEquals(NUM_CONTAINERS, containerSet.containerCount());
 
     Handler containerHandler = new KeyValueHandler(new OzoneConfiguration(), 
dnDetails.getUuidString(), containerSet,
-        mock(VolumeSet.class), mock(ContainerMetrics.class), icrSender);
+        mock(VolumeSet.class), mock(ContainerMetrics.class), icrSender, new 
ContainerChecksumTreeManager(conf));
     ContainerController controller = new ContainerController(containerSet,
         singletonMap(ContainerProtos.ContainerType.KeyValueContainer, 
containerHandler));
     ozoneContainer = mock(OzoneContainer.class);
@@ -114,80 +123,37 @@ public class TestReconcileContainerCommandHandler {
     init(layout, icrSender);
 
     for (int id = 1; id <= NUM_CONTAINERS; id++) {
-      ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, 
Collections.emptyList());
+      ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, 
Collections.emptySet());
       subject.handle(cmd, ozoneContainer, context, null);
     }
 
     // An unknown container should not trigger a container report being sent.
     ReconcileContainerCommand unknownContainerCmd = new 
ReconcileContainerCommand(NUM_CONTAINERS + 1,
-        Collections.emptyList());
+        Collections.emptySet());
     subject.handle(unknownContainerCmd, ozoneContainer, context, null);
 
-    waitForAllCommandsToFinish();
+    // Since the replication supervisor is mocked in this test, reports are 
processed immediately.
     verifyAllContainerReports(containerReportsSent);
   }
 
+  /**
+   * Most metrics are handled by the ReplicationSupervisor. Only check the 
individual metrics here.
+   */
   @ContainerLayoutTestInfo.ContainerTest
   public void testReconcileContainerCommandMetrics(ContainerLayoutVersion 
layout) throws Exception {
-    // Used to block ICR sending so that queue metrics can be checked before 
the reconcile task completes.
-    CountDownLatch icrLatch = new CountDownLatch(1);
-    // Wait this long before completing the task.
-    // This provides a lower bound on execution time.
-    final long minExecTimeMillis = 500;
-    // This is the lower bound on execution time of all the commands combined.
-    final long expectedTotalMinExecTimeMillis = minExecTimeMillis * 
NUM_CONTAINERS;
+    init(layout, c -> { });
 
-    IncrementalReportSender<Container> icrSender = c -> {
-      try {
-        // Block the caller until the latch is counted down.
-        // Caller can check queue metrics in the meantime.
-        LOG.info("ICR sender waiting for latch");
-        assertTrue(icrLatch.await(30, TimeUnit.SECONDS));
-        LOG.info("ICR sender proceeding after latch");
-
-        Thread.sleep(minExecTimeMillis);
-      } catch (Exception ex) {
-        LOG.error("ICR sender failed", ex);
-      }
-    };
-
-    init(layout, icrSender);
+    assertEquals(0, subject.getInvocationCount());
 
     // All commands submitted will be blocked until the latch is counted down.
     for (int id = 1; id <= NUM_CONTAINERS; id++) {
-      ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, 
Collections.emptyList());
+      ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, 
Collections.emptySet());
       subject.handle(cmd, ozoneContainer, context, null);
     }
-    assertEquals(NUM_CONTAINERS, subject.getQueuedCount());
-    assertEquals(0, subject.getTotalRunTime());
-    assertEquals(0, subject.getAverageRunTime());
-
-    // This will resume handling of the tasks.
-    icrLatch.countDown();
-    waitForAllCommandsToFinish();
-
     assertEquals(NUM_CONTAINERS, subject.getInvocationCount());
-    long totalRunTime = subject.getTotalRunTime();
-    assertTrue(totalRunTime >= expectedTotalMinExecTimeMillis,
-        "Total run time " + totalRunTime + "ms was not larger than the minimum 
total exec time " +
-            expectedTotalMinExecTimeMillis + "ms");
-    long avgRunTime = subject.getAverageRunTime();
-    assertTrue(avgRunTime >= minExecTimeMillis,
-        "Average run time " + avgRunTime + "ms was not larger than the minimum 
per task exec time " +
-            minExecTimeMillis + "ms");
-  }
-
-  private void waitForAllCommandsToFinish() throws Exception {
-    // Queue count should be decremented only after the task completes, so the 
other metrics should be consistent when
-    // it reaches zero.
-    GenericTestUtils.waitFor(() -> {
-      int qCount = subject.getQueuedCount();
-      LOG.info("Waiting for queued command count to reach 0. Currently at " + 
qCount);
-      return qCount == 0;
-    }, 500, 3000);
   }
 
-  private void verifyAllContainerReports(Map<ContainerID, 
ContainerReplicaProto> reportsSent) throws Exception {
+  private void verifyAllContainerReports(Map<ContainerID, 
ContainerReplicaProto> reportsSent) {
     assertEquals(NUM_CONTAINERS, reportsSent.size());
 
     for (Map.Entry<ContainerID, ContainerReplicaProto> entry: 
reportsSent.entrySet()) {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 6245489f13..49b109b913 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -33,9 +33,11 @@ import static org.mockito.Mockito.when;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
+import java.util.Set;
 import java.util.UUID;
 
 import com.google.protobuf.Proto2Utils;
@@ -118,7 +120,7 @@ public class TestHeartbeatEndpointTask {
     StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
         mock(StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
 
-    List<DatanodeDetails> peerDNs = new ArrayList<>();
+    Set<DatanodeDetails> peerDNs = new HashSet<>();
     peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
     peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
     ReconcileContainerCommand cmd = new ReconcileContainerCommand(1, peerDNs);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 4527ee6d51..8e5f7f01e7 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -41,6 +41,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -48,6 +50,7 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -275,16 +278,11 @@ public class TestKeyValueHandler {
         null, StorageVolume.VolumeType.DATA_VOLUME, null);
     try {
       ContainerSet cset = new ContainerSet(1000);
-      int[] interval = new int[1];
-      interval[0] = 2;
-      ContainerMetrics metrics = new ContainerMetrics(interval);
       DatanodeDetails datanodeDetails = mock(DatanodeDetails.class);
       StateContext context = ContainerTestUtils.getMockContext(
           datanodeDetails, conf);
-      KeyValueHandler keyValueHandler = new KeyValueHandler(conf,
-          context.getParent().getDatanodeDetails().getUuidString(), cset,
-          volumeSet, metrics, c -> {
-      });
+      KeyValueHandler keyValueHandler = 
ContainerTestUtils.getKeyValueHandler(conf,
+          context.getParent().getDatanodeDetails().getUuidString(), cset, 
volumeSet);
       assertEquals("org.apache.hadoop.ozone.container.common" +
           ".volume.CapacityVolumeChoosingPolicy",
           keyValueHandler.getVolumeChoosingPolicyForTesting()
@@ -294,8 +292,8 @@ public class TestKeyValueHandler {
       conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
           "org.apache.hadoop.ozone.container.common.impl.HddsDispatcher");
       RuntimeException exception = assertThrows(RuntimeException.class,
-          () -> new KeyValueHandler(conf, 
context.getParent().getDatanodeDetails().getUuidString(), cset, volumeSet,
-              metrics, c -> { }));
+          () -> ContainerTestUtils.getKeyValueHandler(conf, 
context.getParent().getDatanodeDetails().getUuidString(),
+              cset, volumeSet));
 
       assertThat(exception).hasMessageEndingWith(
           "class org.apache.hadoop.ozone.container.common.impl.HddsDispatcher 
" +
@@ -385,7 +383,7 @@ public class TestKeyValueHandler {
 
       final KeyValueHandler kvHandler = new KeyValueHandler(conf,
           datanodeId, containerSet, volumeSet, metrics,
-          c -> icrReceived.incrementAndGet());
+          c -> icrReceived.incrementAndGet(), new 
ContainerChecksumTreeManager(conf));
       kvHandler.setClusterID(clusterId);
 
       final ContainerCommandRequestProto createContainer =
@@ -459,8 +457,7 @@ public class TestKeyValueHandler {
 
     // Allows checking the invocation count of the lambda.
     AtomicInteger icrCount = new AtomicInteger(0);
-    KeyValueHandler keyValueHandler = new KeyValueHandler(conf, 
randomDatanodeDetails().getUuidString(), containerSet,
-        mock(MutableVolumeSet.class), mock(ContainerMetrics.class), c -> {
+    IncrementalReportSender<Container> icrSender = c -> {
       // Check that the ICR contains expected info about the container.
       ContainerReplicaProto report = c.getContainerReport();
       long reportedID = report.getContainerID();
@@ -470,11 +467,14 @@ public class TestKeyValueHandler {
       Assertions.assertNotEquals(0, reportDataChecksum,
           "Container report should have populated the checksum field with a 
non-zero value.");
       icrCount.incrementAndGet();
-    });
+    };
+
+    KeyValueHandler keyValueHandler = new KeyValueHandler(conf, 
randomDatanodeDetails().getUuidString(), containerSet,
+        mock(MutableVolumeSet.class), mock(ContainerMetrics.class), icrSender, 
new ContainerChecksumTreeManager(conf));
 
     Assertions.assertEquals(0, icrCount.get());
     // This should trigger container report validation in the ICR handler 
above.
-    keyValueHandler.reconcileContainer(container, Collections.emptyList());
+    keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), 
container, Collections.emptySet());
     Assertions.assertEquals(1, icrCount.get());
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index f0c8a2077e..8207993218 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
@@ -254,7 +255,7 @@ public class TestKeyValueHandlerWithUnhealthyContainer {
         stateMachine.getDatanodeDetails().getUuidString(),
         mock(ContainerSet.class),
         mock(MutableVolumeSet.class),
-        mock(ContainerMetrics.class), mockIcrSender);
+        mock(ContainerMetrics.class), mockIcrSender, 
mock(ContainerChecksumTreeManager.class));
   }
 
   private KeyValueContainer getMockUnhealthyContainer() {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
index 03901b99be..b8c43460ba 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
@@ -25,7 +25,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContai
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -35,7 +35,6 @@ import 
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.junit.jupiter.api.AfterEach;
@@ -129,11 +128,8 @@ class TestGrpcReplicationService {
     when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(
         new HddsVolume.Builder(testDir).conf(conf).build()));
 
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
     Handler containerHandler =
-        new KeyValueHandler(conf, datanode.getUuidString(), containerSet,
-            volumeSet, metrics, c -> {
-        });
+        ContainerTestUtils.getKeyValueHandler(conf, datanode.getUuidString(), 
containerSet, volumeSet);
 
     containerController = new ContainerController(containerSet,
         Collections.singletonMap(
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
index f13b37f3ee..8e8b5bf71c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
@@ -32,7 +32,6 @@ import 
org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -78,9 +77,9 @@ public class ReconcileContainerEventHandler implements 
EventHandler<ContainerID>
       LOG.info("Reconcile container event triggered for container {} with 
peers {}", containerID, allReplicaNodes);
 
       for (DatanodeDetails replica : allReplicaNodes) {
-        List<DatanodeDetails> otherReplicas = allReplicaNodes.stream()
+        Set<DatanodeDetails> otherReplicas = allReplicaNodes.stream()
             .filter(other -> !other.equals(replica))
-            .collect(Collectors.toList());
+            .collect(Collectors.toSet());
         ReconcileContainerCommand command = new 
ReconcileContainerCommand(containerID.getId(), otherReplicas);
         command.setTerm(scmContext.getTermOfLeader());
         publisher.fireEvent(DATANODE_COMMAND, new 
CommandForDatanode<>(replica.getUuid(), command));
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index 068cb01a96..2d1b1a4bb0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
@@ -48,7 +47,6 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -148,18 +146,7 @@ public class TestContainerMetrics {
     ContainerSet containerSet = new ContainerSet(1000);
     StateContext context = ContainerTestUtils.getMockContext(
         dd, CONF);
-    ContainerMetrics metrics = ContainerMetrics.create(CONF);
-    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
-    for (ContainerProtos.ContainerType containerType :
-        ContainerProtos.ContainerType.values()) {
-      handlers.put(containerType,
-          Handler.getHandlerForContainerType(containerType, CONF,
-              context.getParent().getDatanodeDetails().getUuidString(),
-              containerSet, volumeSet, metrics,
-              c -> { }));
-    }
-    HddsDispatcher dispatcher = new HddsDispatcher(CONF, containerSet,
-        volumeSet, handlers, context, metrics, null);
+    HddsDispatcher dispatcher = ContainerTestUtils.getHddsDispatcher(CONF, 
containerSet, volumeSet, context);
     StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
         .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
     dispatcher.setClusterId(UUID.randomUUID().toString());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 630c4d3149..ab95467a71 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.RatisTestHelper;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -200,19 +199,7 @@ public class TestContainerServer {
     StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
         .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
     StateContext context = ContainerTestUtils.getMockContext(dd, conf);
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
-    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
-    for (ContainerProtos.ContainerType containerType :
-        ContainerProtos.ContainerType.values()) {
-      handlers.put(containerType,
-          Handler.getHandlerForContainerType(containerType, conf,
-              dd.getUuid().toString(),
-              containerSet, volumeSet, metrics,
-              c -> {
-              }));
-    }
-    HddsDispatcher hddsDispatcher = new HddsDispatcher(
-        conf, containerSet, volumeSet, handlers, context, metrics, null);
+    HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf, 
containerSet, volumeSet, context);
     hddsDispatcher.setClusterId(scmId.toString());
     return hddsDispatcher;
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 8044685bb7..47be4daf90 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -25,7 +25,6 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -57,11 +56,9 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.RatisTestHelper;
 import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -183,18 +180,7 @@ public class TestSecureContainerServer {
     StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
         .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
     StateContext context = ContainerTestUtils.getMockContext(dd, conf);
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
-    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
-    for (ContainerProtos.ContainerType containerType :
-        ContainerProtos.ContainerType.values()) {
-      handlers.put(containerType,
-          Handler.getHandlerForContainerType(containerType, conf,
-              dd.getUuid().toString(),
-              containerSet, volumeSet, metrics,
-              c -> { }));
-    }
-    HddsDispatcher hddsDispatcher = new HddsDispatcher(
-        conf, containerSet, volumeSet, handlers, context, metrics,
+    HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf, 
containerSet, volumeSet, context,
         TokenVerifier.create(new SecurityConfig(conf), secretKeyClient));
     hddsDispatcher.setClusterId(scmId.toString());
     return hddsDispatcher;
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
index 5592926bf8..a0aba2a1b1 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -151,7 +152,9 @@ public class ContainerCommands implements Callable<Void>, 
SubcommandWithParent {
               volumeSet,
               metrics,
               containerReplicaProto -> {
-              });
+              },
+              // Since this is an Ozone debug CLI, this instance is not part 
of a running datanode.
+              new ContainerChecksumTreeManager(conf));
       handler.setClusterID(clusterId);
       handlers.put(containerType, handler);
     }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index d471c13462..0c525457aa 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -193,7 +194,9 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
               volumeSet,
               metrics,
               containerReplicaProto -> {
-              });
+              },
+              // Since this a Freon tool, this instance is not part of a 
running datanode.
+              new ContainerChecksumTreeManager(conf));
       handler.setClusterID(UUID.randomUUID().toString());
       handlers.put(containerType, handler);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to