This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f093f71314 HDDS-8158. Replication Manager: Make all handlers send 
commands immediately instead of returning commands (#4399)
f093f71314 is described below

commit f093f713148866cea3c57d26d21bd0dd348ac9cb
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Mar 16 12:03:01 2023 +0000

    HDDS-8158. Replication Manager: Make all handlers send commands immediately 
instead of returning commands (#4399)
---
 .../replication/ECOverReplicationHandler.java      | 56 +++++++------
 .../replication/ECUnderReplicationHandler.java     | 94 +++++++++++-----------
 .../replication/MisReplicationHandler.java         | 36 ++++-----
 .../replication/OverReplicatedProcessor.java       |  7 +-
 .../replication/RatisOverReplicationHandler.java   | 60 +++++++-------
 .../replication/RatisUnderReplicationHandler.java  | 45 +++++------
 .../container/replication/ReplicationManager.java  | 27 ++++---
 .../replication/UnderReplicatedProcessor.java      |  7 +-
 .../replication/UnhealthyReplicationHandler.java   | 11 +--
 .../replication/UnhealthyReplicationProcessor.java | 15 +---
 .../container/replication/ReplicationTestUtil.java | 79 ++++++++++++++++++
 .../replication/TestECMisReplicationHandler.java   | 11 ++-
 .../replication/TestECOverReplicationHandler.java  | 86 ++++++++++++--------
 .../replication/TestECUnderReplicationHandler.java | 88 ++++++++++----------
 .../replication/TestMisReplicationHandler.java     | 29 +++----
 .../replication/TestOverReplicatedProcessor.java   | 25 ++----
 .../TestRatisMisReplicationHandler.java            | 10 +--
 .../TestRatisOverReplicationHandler.java           | 37 ++++++---
 .../TestRatisUnderReplicationHandler.java          | 31 +++----
 .../replication/TestReplicationManager.java        | 42 ++++++----
 .../replication/TestUnderReplicatedProcessor.java  | 62 +-------------
 21 files changed, 438 insertions(+), 420 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 4bd49144d6..4db51d5263 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -17,30 +17,25 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
-import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.emptySet;
-
 /**
  * Handles the EC Over replication processing and forming the respective SCM
  * commands.
@@ -49,12 +44,12 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
   public static final Logger LOG =
       LoggerFactory.getLogger(ECOverReplicationHandler.class);
 
-  private final NodeManager nodeManager;
+  private final ReplicationManager replicationManager;
 
   public ECOverReplicationHandler(PlacementPolicy placementPolicy,
-      NodeManager nodeManager) {
+      ReplicationManager replicationManager) {
     super(placementPolicy);
-    this.nodeManager = nodeManager;
+    this.replicationManager = replicationManager;
 
   }
 
@@ -67,13 +62,13 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
    * @param result - Health check result.
    * @param remainingMaintenanceRedundancy - represents that how many nodes go
    *                                      into maintenance.
-   * @return Returns the key value pair of destination dn where the command 
gets
-   * executed and the command itself.
+   * @return The number of commands send.
    */
   @Override
-  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
+  public int processAndSendCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
-      ContainerHealthResult result, int remainingMaintenanceRedundancy) {
+      ContainerHealthResult result, int remainingMaintenanceRedundancy)
+      throws NotLeaderException {
     ContainerInfo container = result.getContainerInfo();
 
     // We are going to check for over replication, so we should filter out any
@@ -90,10 +85,14 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
     // second lookup of the NodeStatus
     Set<ContainerReplica> healthyReplicas = replicas.stream()
         .filter(r -> {
-          NodeStatus ns = ReplicationManager.getNodeStatus(
-              r.getDatanodeDetails(), nodeManager);
-          return ns.isHealthy() && ns.getOperationalState() ==
-              HddsProtos.NodeOperationalState.IN_SERVICE;
+          try {
+            NodeStatus ns = replicationManager.getNodeStatus(
+                r.getDatanodeDetails());
+            return ns.isHealthy() && ns.getOperationalState() ==
+                HddsProtos.NodeOperationalState.IN_SERVICE;
+          } catch (NodeNotFoundException e) {
+            return false;
+          }
         })
         .collect(Collectors.toSet());
 
@@ -104,13 +103,13 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       LOG.info("The container {} state changed and it is no longer over"
               + " replication. Replica count: {}, healthy replica count: {}",
           container.getContainerID(), replicas.size(), healthyReplicas.size());
-      return emptySet();
+      return 0;
     }
 
     if (!replicaCount.isOverReplicated(true)) {
       LOG.info("The container {} with replicas {} will be corrected " +
           "by the pending delete", container.getContainerID(), replicas);
-      return emptySet();
+      return 0;
     }
 
     List<Integer> overReplicatedIndexes =
@@ -120,7 +119,7 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       LOG.warn("The container {} with replicas {} was found over replicated "
           + "by EcContainerReplicaCount, but there are no over replicated "
           + "indexes returned", container.getContainerID(), replicas);
-      return emptySet();
+      return 0;
     }
 
     final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
@@ -143,10 +142,10 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       LOG.warn("The container {} is over replicated, but no replicas were "
           + "selected to remove by the placement policy. Replicas: {}",
           container, replicas);
-      return emptySet();
+      return 0;
     }
 
-    final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+    int commandsSent = 0;
     // As a sanity check, sum up the current counts of each replica index. When
     // processing replicasToRemove, ensure that removing the replica would not
     // drop the count of that index to zero.
@@ -164,16 +163,15 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
         continue;
       }
       replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
-      DeleteContainerCommand deleteCommand =
-          new DeleteContainerCommand(container.getContainerID(), true);
-      deleteCommand.setReplicaIndex(r.getReplicaIndex());
-      commands.add(Pair.of(r.getDatanodeDetails(), deleteCommand));
+      replicationManager.sendDeleteCommand(container, r.getReplicaIndex(),
+          r.getDatanodeDetails(), true);
+      commandsSent++;
     }
 
-    if (commands.size() == 0) {
+    if (commandsSent == 0) {
       LOG.warn("With the current state of available replicas {}, no" +
           " commands were created to remove excess replicas.", replicas);
     }
-    return commands;
+    return commandsSent;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 965cafe83d..bf1aa2d3f8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -35,21 +35,19 @@ import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.emptySet;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 
 /**
@@ -105,14 +103,10 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
    * @param result - Health check result.
    * @param remainingMaintenanceRedundancy - represents that how many nodes go
    *                                      into maintenance.
-   * @return Returns the key value pair of destination dn where the command 
gets
-   * executed and the command itself. If an empty list is returned, it 
indicates
-   * the container is no longer unhealthy and can be removed from the unhealthy
-   * queue. Any exception indicates that the container is still unhealthy and
-   * should be retried later.
+   * @return The number of commands sent.
    */
   @Override
-  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
+  public int processAndSendCommands(
       final Set<ContainerReplica> replicas,
       final List<ContainerReplicaOp> pendingOps,
       final ContainerHealthResult result,
@@ -126,13 +120,13 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     if (replicaCount.isSufficientlyReplicated()) {
       LOG.info("The container {} state changed and it's not in under"
               + " replication any more.", container.getContainerID());
-      return emptySet();
+      return 0;
     }
     if (replicaCount.isSufficientlyReplicated(true)) {
       LOG.info("The container {} with replicas {} will be sufficiently " +
           "replicated after pending replicas are created",
           container.getContainerID(), replicaCount.getReplicas());
-      return emptySet();
+      return 0;
     }
 
     // don't place reconstructed replicas on exclude nodes, since they already
@@ -149,7 +143,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         .collect(Collectors.toList()));
 
     final ContainerID id = container.containerID();
-    final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+    int commandsSent = 0;
     try {
       final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
       for (ContainerReplicaOp op : pendingOps) {
@@ -169,12 +163,12 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
               .collect(Collectors.toList());
 
       try {
-        processMissingIndexes(replicaCount, sources, availableSourceNodes,
-            excludedNodes, commands);
-        processDecommissioningIndexes(replicaCount, replicas,
-            availableSourceNodes, excludedNodes, commands);
-        processMaintenanceOnlyIndexes(replicaCount, replicas, excludedNodes,
-            commands);
+        commandsSent += processMissingIndexes(replicaCount, sources,
+            availableSourceNodes, excludedNodes);
+        commandsSent += processDecommissioningIndexes(replicaCount, replicas,
+            availableSourceNodes, excludedNodes);
+        commandsSent +=  processMaintenanceOnlyIndexes(replicaCount, replicas,
+            excludedNodes);
         // TODO - we should be able to catch SCMException here and check the
         //        result code but the RackAware topology never sets the code.
       } catch (CannotFindTargetsException e) {
@@ -194,10 +188,10 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         // container will be re-processed in a further RM pass.
         LOG.debug("Unable to located new target nodes for container {}",
             container, e);
-        if (commands.size() > 0) {
+        if (commandsSent > 0) {
           LOG.debug("Some commands have already been created, so returning " +
               "with them only");
-          return commands;
+          return commandsSent;
         }
         if (replicaCount.isOverReplicated()) {
           LOG.debug("Container {} is both under and over replicated. Cannot " +
@@ -214,11 +208,11 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
           id, ex);
       throw ex;
     }
-    if (commands.size() == 0) {
+    if (commandsSent == 0) {
       LOG.warn("Container {} is under replicated, but no commands were " +
           "created to correct it", id);
     }
-    return commands;
+    return commandsSent;
   }
 
   private Map<Integer, Pair<ContainerReplica, NodeStatus>> filterSources(
@@ -280,22 +274,23 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   /**
    * Processes replicas that are in maintenance nodes and should need
    * additional copies.
+   * @return number of commands sent
    * @throws IOException
    */
-  private void processMissingIndexes(
+  private int processMissingIndexes(
       ECContainerReplicaCount replicaCount, Map<Integer,
       Pair<ContainerReplica, NodeStatus>> sources,
       List<DatanodeDetails> availableSourceNodes,
-      List<DatanodeDetails> excludedNodes,
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
+      List<DatanodeDetails> excludedNodes) throws IOException {
     ContainerInfo container = replicaCount.getContainer();
     ECReplicationConfig repConfig =
         (ECReplicationConfig)container.getReplicationConfig();
     List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
     if (missingIndexes.size() == 0) {
-      return;
+      return 0;
     }
 
+    int commandsSent = 0;
     if (sources.size() >= repConfig.getData()) {
       final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
           excludedNodes, container, missingIndexes.size());
@@ -320,8 +315,9 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
                 sourceDatanodesWithIndex, selectedDatanodes,
                 int2byte(missingIndexes),
                 repConfig);
-        // Keeping the first target node as coordinator.
-        commands.add(Pair.of(selectedDatanodes.get(0), reconstructionCommand));
+        replicationManager.sendDatanodeCommand(reconstructionCommand,
+            container, selectedDatanodes.get(0));
+        commandsSent++;
       }
     } else {
       LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
@@ -330,21 +326,23 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
               + " {}. Available sources are: {}", container.containerID(),
           repConfig.getData(), sources.size(), sources);
     }
+    return commandsSent;
   }
 
   /**
    * Processes replicas that are in maintenance nodes and should need
    * additional copies.
+   * @return number of commands sent
    * @throws IOException
    */
-  private void processDecommissioningIndexes(
+  private int processDecommissioningIndexes(
       ECContainerReplicaCount replicaCount,
       Set<ContainerReplica> replicas,
       List<DatanodeDetails> availableSourceNodes,
-      List<DatanodeDetails> excludedNodes,
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
+      List<DatanodeDetails> excludedNodes) throws IOException {
     ContainerInfo container = replicaCount.getContainer();
     Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
+    int commandsSent = 0;
     if (decomIndexes.size() > 0) {
       final List<DatanodeDetails> selectedDatanodes =
           getTargetDatanodes(excludedNodes, container, decomIndexes.size());
@@ -361,12 +359,13 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
                   replicas, selectedDatanodes, excludedNodes, decomIndexes);
               break;
             }
-            createReplicateCommand(commands, container, iterator, replica);
+            createReplicateCommand(container, iterator, replica);
+            commandsSent++;
           }
         }
       }
     }
-
+    return commandsSent;
   }
 
   /**
@@ -375,16 +374,15 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
    * @param replicaCount
    * @param replicas set of container replicas
    * @param excludedNodes nodes that should not be targets for new copies
-   * @param commands
+   * @@return number of commands sent
    * @throws IOException
    */
-  private void processMaintenanceOnlyIndexes(
+  private int processMaintenanceOnlyIndexes(
       ECContainerReplicaCount replicaCount, Set<ContainerReplica> replicas,
-      List<DatanodeDetails> excludedNodes,
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
+      List<DatanodeDetails> excludedNodes) throws IOException {
     Set<Integer> maintIndexes = replicaCount.maintenanceOnlyIndexes(true);
     if (maintIndexes.isEmpty()) {
-      return;
+      return 0;
     }
 
     ContainerInfo container = replicaCount.getContainer();
@@ -392,13 +390,14 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     int additionalMaintenanceCopiesNeeded =
         replicaCount.additionalMaintenanceCopiesNeeded(true);
     if (additionalMaintenanceCopiesNeeded == 0) {
-      return;
+      return 0;
     }
     List<DatanodeDetails> targets = getTargetDatanodes(excludedNodes, 
container,
         additionalMaintenanceCopiesNeeded);
     excludedNodes.addAll(targets);
 
     Iterator<DatanodeDetails> iterator = targets.iterator();
+    int commandsSent = 0;
     // copy replica from source maintenance DN to a target DN
     for (ContainerReplica replica : replicas) {
       if (maintIndexes.contains(replica.getReplicaIndex()) &&
@@ -410,25 +409,27 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
               replicas, targets, excludedNodes, maintIndexes);
           break;
         }
-        createReplicateCommand(commands, container, iterator, replica);
+        createReplicateCommand(container, iterator, replica);
+        commandsSent++;
         additionalMaintenanceCopiesNeeded -= 1;
       }
     }
+    return commandsSent;
   }
 
   private void createReplicateCommand(
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands,
       ContainerInfo container, Iterator<DatanodeDetails> iterator,
-      ContainerReplica replica) throws AllSourcesOverloadedException {
+      ContainerReplica replica)
+      throws AllSourcesOverloadedException, NotLeaderException {
     final boolean push = replicationManager.getConfig().isPush();
     DatanodeDetails source = replica.getDatanodeDetails();
     DatanodeDetails target = iterator.next();
     final long containerID = container.getContainerID();
 
     if (push) {
-      commands.add(replicationManager.createThrottledReplicationCommand(
-          containerID, Collections.singletonList(source), target,
-          replica.getReplicaIndex()));
+      replicationManager.sendThrottledReplicationCommand(
+          container, Collections.singletonList(source), target,
+          replica.getReplicaIndex());
     } else {
       ReplicateContainerCommand replicateCommand =
           ReplicateContainerCommand.fromSources(containerID,
@@ -436,7 +437,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       // For EC containers, we need to track the replica index which is
       // to be replicated, so add it to the command.
       replicateCommand.setReplicaIndex(replica.getReplicaIndex());
-      commands.add(Pair.of(target, replicateCommand));
+      replicationManager.sendDatanodeCommand(replicateCommand, container,
+          target);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 45e42a32ea..3d013999b0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -30,13 +29,12 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
@@ -120,12 +118,12 @@ public abstract class MisReplicationHandler implements
   protected abstract ReplicateContainerCommand updateReplicateCommand(
           ReplicateContainerCommand command, ContainerReplica replica);
 
-  private Set<Pair<DatanodeDetails, SCMCommand<?>>> getReplicateCommands(
+  private int sendReplicateCommands(
       ContainerInfo containerInfo,
       Set<ContainerReplica> replicasToBeReplicated,
       List<DatanodeDetails> targetDns)
-      throws AllSourcesOverloadedException {
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commandMap = new HashSet<>();
+      throws AllSourcesOverloadedException, NotLeaderException {
+    int commandsSent = 0;
     int datanodeIdx = 0;
     for (ContainerReplica replica : replicasToBeReplicated) {
       if (datanodeIdx == targetDns.size()) {
@@ -135,22 +133,23 @@ public abstract class MisReplicationHandler implements
       DatanodeDetails source = replica.getDatanodeDetails();
       DatanodeDetails target = targetDns.get(datanodeIdx);
       if (push) {
-        commandMap.add(replicationManager.createThrottledReplicationCommand(
-            containerID, Collections.singletonList(source),
-            target, replica.getReplicaIndex()));
+        replicationManager.sendThrottledReplicationCommand(containerInfo,
+            Collections.singletonList(source), target,
+            replica.getReplicaIndex());
       } else {
         ReplicateContainerCommand cmd = ReplicateContainerCommand
             .fromSources(containerID, Collections.singletonList(source));
         updateReplicateCommand(cmd, replica);
-        commandMap.add(Pair.of(target, cmd));
+        replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
       }
+      commandsSent++;
       datanodeIdx += 1;
     }
-    return commandMap;
-
+    return commandsSent;
   }
+
   @Override
-  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
+  public int processAndSendCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int remainingMaintenanceRedundancy)
       throws IOException {
@@ -159,7 +158,7 @@ public abstract class MisReplicationHandler implements
       LOG.info("Skipping Mis-Replication for Container {}, " +
                "as there are still some pending ops for the container: {}",
               container, pendingOps);
-      return Collections.emptySet();
+      return 0;
     }
     ContainerReplicaCount replicaCount = getContainerReplicaCount(container,
             replicas, Collections.emptyList(), remainingMaintenanceRedundancy);
@@ -173,7 +172,7 @@ public abstract class MisReplicationHandler implements
               container.getContainerID(),
               !replicaCount.isSufficientlyReplicated(),
               replicaCount.isOverReplicated());
-      return Collections.emptySet();
+      return 0;
     }
 
     List<DatanodeDetails> usedDns = replicas.stream()
@@ -183,7 +182,7 @@ public abstract class MisReplicationHandler implements
             usedDns.size()).isPolicySatisfied()) {
       LOG.info("Container {} is currently not misreplicated",
               container.getContainerID());
-      return Collections.emptySet();
+      return 0;
     }
 
     Set<ContainerReplica> sources = filterSources(replicas);
@@ -205,7 +204,8 @@ public abstract class MisReplicationHandler implements
               container.getContainerID(), replicasToBeReplicated.size(),
               usedDns);
     }
-    return getReplicateCommands(container, replicasToBeReplicated,
-            targetDatanodes);
+
+    return sendReplicateCommands(container, replicasToBeReplicated,
+        targetDatanodes);
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
index 4f7806487c..b302359c71 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-
 import java.io.IOException;
-import java.util.Set;
 
 /**
  * Class used to pick messages from the ReplicationManager over replicated
@@ -51,7 +46,7 @@ public class OverReplicatedProcessor extends 
UnhealthyReplicationProcessor
     replicationManager.requeueOverReplicatedContainer(healthResult);
   }
   @Override
-  protected Set<Pair<DatanodeDetails, SCMCommand<?>>> getDatanodeCommands(
+  protected int sendDatanodeCommands(
       ReplicationManager replicationManager,
       ContainerHealthResult.OverReplicatedHealthResult healthResult)
       throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
index 26cec40fad..f1796f503e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
@@ -18,23 +18,20 @@
 
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -54,12 +51,12 @@ public class RatisOverReplicationHandler
   public static final Logger LOG =
       LoggerFactory.getLogger(RatisOverReplicationHandler.class);
 
-  private final NodeManager nodeManager;
+  private final ReplicationManager replicationManager;
 
   public RatisOverReplicationHandler(PlacementPolicy placementPolicy,
-      NodeManager nodeManager) {
+      ReplicationManager replicationManager) {
     super(placementPolicy);
-    this.nodeManager = nodeManager;
+    this.replicationManager = replicationManager;
   }
 
   /**
@@ -74,11 +71,10 @@ public class RatisOverReplicationHandler
    *                                 replication
    * @param minHealthyForMaintenance Number of healthy replicas that must be
    *                                 available for a DN to enter maintenance
-   * @return Returns a map of Datanodes and SCMCommands that can be sent to
-   * delete replicas on those datanodes.
+   * @return The number of commands sent.
    */
   @Override
-  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
+  public int processAndSendCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int minHealthyForMaintenance) throws
       IOException {
@@ -94,9 +90,15 @@ public class RatisOverReplicationHandler
     // HEALTHY one, and then the STALE ones goes away, we will lose them both.
     // To avoid this, we will filter out any non-healthy replicas first.
     Set<ContainerReplica> healthyReplicas = replicas.stream()
-        .filter(r -> ReplicationManager.getNodeStatus(
-            r.getDatanodeDetails(), nodeManager).isHealthy()
-        ).collect(Collectors.toSet());
+        .filter(r -> {
+          try {
+            return replicationManager.getNodeStatus(r.getDatanodeDetails())
+                .isHealthy();
+          } catch (NodeNotFoundException e) {
+            return false;
+          }
+        })
+        .collect(Collectors.toSet());
 
     RatisContainerReplicaCount replicaCount =
         new RatisContainerReplicaCount(containerInfo, healthyReplicas,
@@ -104,7 +106,7 @@ public class RatisOverReplicationHandler
 
     // verify that this container is actually over replicated
     if (!verifyOverReplication(replicaCount)) {
-      return Collections.emptySet();
+      return 0;
     }
 
     // count pending deletes
@@ -126,11 +128,12 @@ public class RatisOverReplicationHandler
     if (eligibleReplicas.size() == 0) {
       LOG.info("Did not find any replicas that are eligible to be deleted for" 
+
           " container {}.", containerInfo);
-      return Collections.emptySet();
+      return 0;
     }
 
     // get number of excess replicas
     int excess = replicaCount.getExcessRedundancy(true);
+
     return createCommands(containerInfo, eligibleReplicas, excess);
   }
 
@@ -243,25 +246,26 @@ public class RatisOverReplicationHandler
         .collect(Collectors.toList());
   }
 
-  private Set<Pair<DatanodeDetails, SCMCommand<?>>> createCommands(
+  private int createCommands(
       ContainerInfo containerInfo, List<ContainerReplica> replicas,
-      int excess) {
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+      int excess) throws NotLeaderException {
 
     /*
     Being in the over replication queue means we have enough replicas that
     match the container's state, so unhealthy or mismatched replicas can be
     deleted. This might make the container violate placement policy.
      */
+    int commandsSent = 0;
     List<ContainerReplica> replicasRemoved = new ArrayList<>();
     for (ContainerReplica replica : replicas) {
       if (excess == 0) {
-        return commands;
+        return commandsSent;
       }
       if (!ReplicationManager.compareState(
           containerInfo.getState(), replica.getState())) {
-        commands.add(Pair.of(replica.getDatanodeDetails(),
-            createDeleteCommand(containerInfo)));
+        replicationManager.sendDeleteCommand(containerInfo,
+            replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
+        commandsSent++;
         replicasRemoved.add(replica);
         excess--;
       }
@@ -277,20 +281,18 @@ public class RatisOverReplicationHandler
     // iterate through replicas in deterministic order
     for (ContainerReplica replica : replicas) {
       if (excess == 0) {
-        return commands;
+        return commandsSent;
       }
 
       if (super.isPlacementStatusActuallyEqualAfterRemove(replicaSet, replica,
           containerInfo.getReplicationFactor().getNumber())) {
-        commands.add(Pair.of(replica.getDatanodeDetails(),
-            createDeleteCommand(containerInfo)));
+        replicationManager.sendDeleteCommand(containerInfo,
+            replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
+        commandsSent++;
         excess--;
       }
     }
-    return commands;
+    return commandsSent;
   }
 
-  private DeleteContainerCommand createDeleteCommand(ContainerInfo container) {
-    return new DeleteContainerCommand(container.containerID(), true);
-  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index ef9eccbac2..06485d4e71 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -29,12 +28,11 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -74,14 +72,10 @@ public class RatisUnderReplicationHandler
    * @param result Health check result indicating under replication.
    * @param minHealthyForMaintenance Number of healthy replicas that must be
    *                                 available for a DN to enter maintenance
-   * @return Returns the key value pair of destination dn where the command 
gets
-   * executed and the command itself. If an empty map is returned, it indicates
-   * the container is no longer unhealthy and can be removed from the unhealthy
-   * queue. Any exception indicates that the container is still unhealthy and
-   * should be retried later.
+   * @return The number of commands sent.
    */
   @Override
-  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
+  public int processAndSendCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int minHealthyForMaintenance)
       throws IOException {
@@ -99,7 +93,7 @@ public class RatisUnderReplicationHandler
     // verify that this container is still under replicated and we don't have
     // sufficient replication after considering pending adds
     if (!verifyUnderReplication(withUnhealthy, withoutUnhealthy)) {
-      return Collections.emptySet();
+      return 0;
     }
 
     // find sources that can provide replicas
@@ -108,7 +102,7 @@ public class RatisUnderReplicationHandler
     if (sourceDatanodes.isEmpty()) {
       LOG.warn("Cannot replicate container {} because no CLOSED, QUASI_CLOSED" 
+
           " or UNHEALTHY replicas were found.", containerInfo);
-      return Collections.emptySet();
+      return 0;
     }
 
     // find targets to send replicas to
@@ -117,11 +111,10 @@ public class RatisUnderReplicationHandler
     if (targetDatanodes.isEmpty()) {
       LOG.warn("Cannot replicate container {} because no eligible targets " +
           "were found.", containerInfo);
-      return Collections.emptySet();
+      return 0;
     }
-
-    return createReplicationCommands(containerInfo.getContainerID(),
-        sourceDatanodes, targetDatanodes);
+    return sendReplicationCommands(
+        containerInfo, sourceDatanodes, targetDatanodes);
   }
 
   /**
@@ -246,24 +239,28 @@ public class RatisUnderReplicationHandler
         replicaCount.additionalReplicaNeeded(), 0, dataSizeRequired);
   }
 
-  private Set<Pair<DatanodeDetails, SCMCommand<?>>> createReplicationCommands(
-      long containerID, List<DatanodeDetails> sources,
-      List<DatanodeDetails> targets) throws AllSourcesOverloadedException {
+  private int sendReplicationCommands(
+      ContainerInfo containerInfo, List<DatanodeDetails> sources,
+      List<DatanodeDetails> targets) throws AllSourcesOverloadedException,
+      NotLeaderException {
     final boolean push = replicationManager.getConfig().isPush();
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+    int commandsSent = 0;
 
     if (push) {
       for (DatanodeDetails target : targets) {
-        commands.add(replicationManager.createThrottledReplicationCommand(
-            containerID, sources, target, 0));
+        replicationManager.sendThrottledReplicationCommand(
+            containerInfo, sources, target, 0);
+        commandsSent++;
       }
     } else {
       for (DatanodeDetails target : targets) {
         ReplicateContainerCommand command =
-            ReplicateContainerCommand.fromSources(containerID, sources);
-        commands.add(Pair.of(target, command));
+            ReplicateContainerCommand.fromSources(
+                containerInfo.getContainerID(), sources);
+        replicationManager.sendDatanodeCommand(command, containerInfo, target);
+        commandsSent++;
       }
     }
-    return commands;
+    return commandsSent;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index ab673c6367..287c1f14d5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -232,13 +232,13 @@ public class ReplicationManager implements SCMService {
     ecUnderReplicationHandler = new ECUnderReplicationHandler(
         ecContainerPlacement, conf, this);
     ecOverReplicationHandler =
-        new ECOverReplicationHandler(ecContainerPlacement, nodeManager);
+        new ECOverReplicationHandler(ecContainerPlacement, this);
     ecMisReplicationHandler = new ECMisReplicationHandler(ecContainerPlacement,
         conf, this, rmConf.isPush());
     ratisUnderReplicationHandler = new RatisUnderReplicationHandler(
         ratisContainerPlacement, conf, this);
     ratisOverReplicationHandler =
-        new RatisOverReplicationHandler(ratisContainerPlacement, nodeManager);
+        new RatisOverReplicationHandler(ratisContainerPlacement, this);
     underReplicatedProcessor =
         new UnderReplicatedProcessor(this,
             rmConf.getUnderReplicatedInterval());
@@ -485,6 +485,15 @@ public class ReplicationManager implements SCMService {
     return Pair.of(sourceWithCmds.get(0).getRight(), cmd);
   }
 
+  public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
+      List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
+      throws AllSourcesOverloadedException, NotLeaderException {
+    Pair<DatanodeDetails, SCMCommand<?>> cmdPair =
+        createThrottledReplicationCommand(containerInfo.getContainerID(),
+            sources, target, replicaIndex);
+    sendDatanodeCommand(cmdPair.getRight(), containerInfo, cmdPair.getLeft());
+  }
+
   /**
    * Send a push replication command to the given source datanode, instructing
    * it to copy the given container to the target. The command is sent as a low
@@ -652,7 +661,7 @@ public class ReplicationManager implements SCMService {
     }
   }
 
-  Set<Pair<DatanodeDetails, SCMCommand<?>>> processUnderReplicatedContainer(
+  int processUnderReplicatedContainer(
       final ContainerHealthResult result) throws IOException {
     ContainerID containerID = result.getContainerInfo().containerID();
     Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -662,22 +671,22 @@ public class ReplicationManager implements SCMService {
     if (result.getContainerInfo().getReplicationType() == EC) {
       if (result.getHealthState()
           == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
-        return ecUnderReplicationHandler.processAndCreateCommands(replicas,
+        return ecUnderReplicationHandler.processAndSendCommands(replicas,
             pendingOps, result, maintenanceRedundancy);
       } else if (result.getHealthState()
           == ContainerHealthResult.HealthState.MIS_REPLICATED) {
-        return ecMisReplicationHandler.processAndCreateCommands(replicas,
+        return ecMisReplicationHandler.processAndSendCommands(replicas,
             pendingOps, result, maintenanceRedundancy);
       } else {
         throw new IllegalArgumentException("Unexpected health state: "
             + result.getHealthState());
       }
     }
-    return ratisUnderReplicationHandler.processAndCreateCommands(replicas,
+    return ratisUnderReplicationHandler.processAndSendCommands(replicas,
         pendingOps, result, ratisMaintenanceMinReplicas);
   }
 
-  Set<Pair<DatanodeDetails, SCMCommand<?>>> processOverReplicatedContainer(
+  int processOverReplicatedContainer(
       final ContainerHealthResult result) throws IOException {
     ContainerID containerID = result.getContainerInfo().containerID();
     Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -685,10 +694,10 @@ public class ReplicationManager implements SCMService {
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerID);
     if (result.getContainerInfo().getReplicationType() == EC) {
-      return ecOverReplicationHandler.processAndCreateCommands(replicas,
+      return ecOverReplicationHandler.processAndSendCommands(replicas,
           pendingOps, result, maintenanceRedundancy);
     }
-    return ratisOverReplicationHandler.processAndCreateCommands(replicas,
+    return ratisOverReplicationHandler.processAndSendCommands(replicas,
         pendingOps, result, ratisMaintenanceMinReplicas);
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
index 10d3d2d1ef..44a938471f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-
 import java.io.IOException;
-import java.util.Set;
 
 /**
  * Class used to pick messages from the ReplicationManager under replicated
@@ -51,7 +46,7 @@ public class UnderReplicatedProcessor extends 
UnhealthyReplicationProcessor
   }
 
   @Override
-  protected Set<Pair<DatanodeDetails, SCMCommand<?>>> getDatanodeCommands(
+  protected int sendDatanodeCommands(
           ReplicationManager replicationManager,
           ContainerHealthResult.UnderReplicatedHealthResult healthResult)
           throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
index fe73756e39..4385aeceac 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
@@ -17,10 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.io.IOException;
 import java.util.List;
@@ -41,13 +38,9 @@ public interface UnhealthyReplicationHandler {
    * @param result - Health check result.
    * @param remainingMaintenanceRedundancy - represents that how many nodes go
    *                                      into maintenance.
-   * @return Returns the key value pair of destination dn where the command 
gets
-   * executed and the command itself. If an empty list is returned, it 
indicates
-   * the container is no longer unhealthy and can be removed from the unhealthy
-   * queue. Any exception indicates that the container is still unhealthy and
-   * should be retried later.
+   * @return The number of commands sent.
    */
-  Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
+  int processAndSendCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int remainingMaintenanceRedundancy)
       throws IOException;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index bbaa23ca8c..dd14733ad2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -20,10 +20,7 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +28,6 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Class used to pick messages from the respective ReplicationManager
@@ -121,19 +117,14 @@ public abstract class 
UnhealthyReplicationProcessor<HealthResult extends
    * container health result.
    * @return Commands to be run on Datanodes
    */
-  protected abstract Set<Pair<DatanodeDetails, SCMCommand<?>>>
-      getDatanodeCommands(ReplicationManager rm, HealthResult healthResult)
+  protected abstract int
+      sendDatanodeCommands(ReplicationManager rm, HealthResult healthResult)
           throws IOException;
 
   private void processContainer(HealthResult healthResult) throws IOException {
     ContainerInfo containerInfo = healthResult.getContainerInfo();
     synchronized (containerInfo) {
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds = getDatanodeCommands(
-          replicationManager, healthResult);
-      for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds) {
-        replicationManager.sendDatanodeCommand(cmd.getValue(),
-            healthResult.getContainerInfo(), cmd.getKey());
-      }
+      sendDatanodeCommands(replicationManager, healthResult);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 837293d88e..3345340291 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -33,6 +33,12 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -43,6 +49,10 @@ import java.util.UUID;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
 import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * Helper class to provide common methods used to test ReplicationManager.
@@ -307,4 +317,73 @@ public final class ReplicationTestUtil {
       }
     };
   }
+
+  /**
+   * Given a Mockito mock of ReplicationManager, this method will mock the
+   * SendThrottledReplicationCommand method so that it adds the command created
+   * to the commandsSent set.
+   * @param mock Mock of ReplicationManager
+   * @param commandsSent Set to add the command to rather than sending it.
+   * @throws NotLeaderException
+   * @throws AllSourcesOverloadedException
+   */
+  public static void mockRMSendThrottleReplicateCommand(ReplicationManager 
mock,
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+      throws NotLeaderException, AllSourcesOverloadedException {
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
+      ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+      ReplicateContainerCommand command = ReplicateContainerCommand
+          .toTarget(containerInfo.getContainerID(),
+              invocationOnMock.getArgument(2));
+      command.setReplicaIndex(invocationOnMock.getArgument(3));
+      commandsSent.add(Pair.of(sources.get(0), command));
+      return null;
+    }).when(mock).sendThrottledReplicationCommand(
+        Mockito.any(ContainerInfo.class), Mockito.anyList(),
+        Mockito.any(DatanodeDetails.class), anyInt());
+  }
+
+  /**
+   * Given a Mockito mock of ReplicationManager, this method will mock the
+   * sendDatanodeCommand method so that it adds the command created to the
+   * commandsSent set.
+   * @param mock Mock of ReplicationManager
+   * @param commandsSent Set to add the command to rather than sending it.
+   * @throws NotLeaderException
+   */
+  public static void mockRMSendDatanodeCommand(ReplicationManager mock,
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+      throws NotLeaderException {
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      DatanodeDetails target = invocationOnMock.getArgument(2);
+      SCMCommand<?> command = invocationOnMock.getArgument(0);
+      commandsSent.add(Pair.of(target, command));
+      return null;
+    }).when(mock).sendDatanodeCommand(any(), any(), any());
+  }
+
+  /**
+   * Given a Mockito mock of ReplicationManager, this method will mock the
+   * sendDeleteCommand method so that it adds the command created to the
+   * commandsSent set.
+   * @param mock Mock of ReplicationManager
+   * @param commandsSent Set to add the command to rather than sending it.
+   * @throws NotLeaderException
+   */
+  public static void mockRMSendDeleteCommand(ReplicationManager mock,
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+      throws NotLeaderException {
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+      int replicaIndex = invocationOnMock.getArgument(1);
+      DatanodeDetails target = invocationOnMock.getArgument(2);
+      boolean forceDelete = invocationOnMock.getArgument(3);
+      DeleteContainerCommand deleteCommand = new DeleteContainerCommand(
+          containerInfo.getContainerID(), forceDelete);
+      deleteCommand.setReplicaIndex(replicaIndex);
+      commandsSent.add(Pair.of(target, deleteCommand));
+      return null;
+    }).when(mock).sendDeleteCommand(any(), anyInt(), any(), anyBoolean());
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
index 629a78781b..4c27fce353 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -44,7 +45,6 @@ import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
 
 /**
  * Tests the ECMisReplicationHandling functionality.
@@ -56,7 +56,7 @@ public class TestECMisReplicationHandler extends 
TestMisReplicationHandler {
 
   @BeforeEach
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException {
+      AllSourcesOverloadedException, NotLeaderException {
     ECReplicationConfig repConfig = new ECReplicationConfig(DATA, PARITY);
     setup(repConfig);
   }
@@ -172,10 +172,9 @@ public class TestECMisReplicationHandler extends 
TestMisReplicationHandler {
   @Test
   public void testAllSourcesOverloaded() throws IOException {
     ReplicationManager replicationManager = getReplicationManager();
-    Mockito.when(replicationManager.createThrottledReplicationCommand(
-        anyLong(), anyList(), any(), anyInt()))
-        .thenThrow(new AllSourcesOverloadedException("Overloaded"));
-
+    Mockito.doThrow(new AllSourcesOverloadedException("Overloaded"))
+        .when(replicationManager).sendThrottledReplicationCommand(any(),
+            anyList(), any(), anyInt());
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index 4542dd930a..01d1c3a0d2 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -46,6 +47,7 @@ import org.mockito.Mockito;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -57,6 +59,7 @@ import static 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaO
 import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.mockito.ArgumentMatchers.any;
 
 /**
  * Tests the ECOverReplicationHandling functionality.
@@ -65,23 +68,33 @@ public class TestECOverReplicationHandler {
   private ECReplicationConfig repConfig;
   private ContainerInfo container;
   private NodeManager nodeManager;
+  private ReplicationManager replicationManager;
   private OzoneConfiguration conf;
   private PlacementPolicy policy;
   private DatanodeDetails staleNode;
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   @BeforeEach
-  public void setup() {
+  public void setup() throws NodeNotFoundException, NotLeaderException {
     staleNode = null;
-    nodeManager = new MockNodeManager(true, 10) {
-      @Override
-      public NodeStatus getNodeStatus(DatanodeDetails dd)
-          throws NodeNotFoundException {
-        if (staleNode != null && dd.equals(staleNode)) {
-          return NodeStatus.inServiceStale();
-        }
-        return NodeStatus.inServiceHealthy();
-      }
-    };
+
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dd = invocation.getArgument(0);
+          if (staleNode != null && staleNode.equals(dd)) {
+            return new NodeStatus(dd.getPersistedOpState(),
+                HddsProtos.NodeState.STALE, 0);
+          }
+          return new NodeStatus(dd.getPersistedOpState(),
+              HddsProtos.NodeState.HEALTHY, 0);
+        });
+
+    commandsSent = new HashSet<>();
+    ReplicationTestUtil.mockRMSendDeleteCommand(replicationManager,
+        commandsSent);
+
+    nodeManager = new MockNodeManager(true, 10);
     conf = SCMTestUtils.getConf();
     repConfig = new ECReplicationConfig(3, 2);
     container = ReplicationTestUtil
@@ -94,7 +107,8 @@ public class TestECOverReplicationHandler {
   }
 
   @Test
-  public void testNoOverReplication() {
+  public void testNoOverReplication()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -104,7 +118,8 @@ public class TestECOverReplicationHandler {
   }
 
   @Test
-  public void testOverReplicationFixedByPendingDelete() {
+  public void testOverReplicationFixedByPendingDelete()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -121,7 +136,8 @@ public class TestECOverReplicationHandler {
   }
 
   @Test
-  public void testOverReplicationWithDecommissionIndexes() {
+  public void testOverReplicationWithDecommissionIndexes()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -132,7 +148,8 @@ public class TestECOverReplicationHandler {
   }
 
   @Test
-  public void testOverReplicationWithStaleIndexes() {
+  public void testOverReplicationWithStaleIndexes()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -149,7 +166,8 @@ public class TestECOverReplicationHandler {
   }
 
   @Test
-  public void testOverReplicationWithOpenReplica() {
+  public void testOverReplicationWithOpenReplica()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -168,7 +186,8 @@ public class TestECOverReplicationHandler {
    * replica.
    */
   @Test
-  public void testOverReplicationButPolicyReturnsWrongIndexes() {
+  public void testOverReplicationButPolicyReturnsWrongIndexes()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
             Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5),
@@ -185,7 +204,8 @@ public class TestECOverReplicationHandler {
   }
 
   @Test
-  public void testOverReplicationWithOneSameIndexes() {
+  public void testOverReplicationWithOneSameIndexes()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -199,7 +219,8 @@ public class TestECOverReplicationHandler {
   }
 
   @Test
-  public void testOverReplicationWithMultiSameIndexes() {
+  public void testOverReplicationWithMultiSameIndexes()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -222,7 +243,8 @@ public class TestECOverReplicationHandler {
    * delete commands should be produced.
    */
   @Test
-  public void testOverReplicationWithUnderReplication() {
+  public void testOverReplicationWithUnderReplication()
+      throws NotLeaderException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(
             Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -235,39 +257,37 @@ public class TestECOverReplicationHandler {
             container, 1, false, false, false);
 
     ECOverReplicationHandler ecORH =
-        new ECOverReplicationHandler(policy, nodeManager);
+        new ECOverReplicationHandler(policy, replicationManager);
 
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecORH
-        .processAndCreateCommands(availableReplicas, ImmutableList.of(),
-            health, 1);
+    ecORH.processAndSendCommands(availableReplicas, ImmutableList.of(),
+        health, 1);
 
-    Assert.assertEquals(1, commands.size());
-    SCMCommand<?> cmd = commands.iterator().next().getValue();
+    Assert.assertEquals(1, commandsSent.size());
+    SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
     Assert.assertEquals(1, ((DeleteContainerCommand)cmd).getReplicaIndex());
   }
 
   private void testOverReplicationWithIndexes(
       Set<ContainerReplica> availableReplicas,
       Map<Integer, Integer> index2excessNum,
-      List<ContainerReplicaOp> pendingOps) {
+      List<ContainerReplicaOp> pendingOps) throws NotLeaderException {
     ECOverReplicationHandler ecORH =
-        new ECOverReplicationHandler(policy, nodeManager);
+        new ECOverReplicationHandler(policy, replicationManager);
     ContainerHealthResult.OverReplicatedHealthResult result =
         Mockito.mock(ContainerHealthResult.OverReplicatedHealthResult.class);
     Mockito.when(result.getContainerInfo()).thenReturn(container);
 
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecORH
-        .processAndCreateCommands(availableReplicas, pendingOps,
+    ecORH.processAndSendCommands(availableReplicas, pendingOps,
             result, 1);
 
     // total commands send out should be equal to the sum of all
     // the excess nums
     int totalDeleteCommandNum =
         index2excessNum.values().stream().reduce(0, Integer::sum);
-    Assert.assertEquals(totalDeleteCommandNum, commands.size());
+    Assert.assertEquals(totalDeleteCommandNum, commandsSent.size());
 
     // Each command should have a non-zero replica index
-    commands.forEach(pair -> Assert.assertNotEquals(0,
+    commandsSent.forEach(pair -> Assert.assertNotEquals(0,
         ((DeleteContainerCommand) pair.getValue()).getReplicaIndex()));
 
     // command num of each index should be equal to the excess num
@@ -277,7 +297,7 @@ public class TestECOverReplicationHandler {
             ContainerReplica::getDatanodeDetails,
             ContainerReplica::getReplicaIndex));
     Map<Integer, Integer> index2commandNum = new HashMap<>();
-    commands.forEach(pair -> index2commandNum.merge(
+    commandsSent.forEach(pair -> index2commandNum.merge(
         datanodeDetails2Index.get(pair.getKey()), 1, Integer::sum)
     );
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index e67de8dcb9..c0a25c517e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.assertj.core.util.Lists;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
@@ -71,6 +72,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.times;
 
 /**
@@ -87,10 +89,11 @@ public class TestECUnderReplicationHandler {
   private static final int PARITY = 2;
   private PlacementPolicy ecPlacementPolicy;
   private int remainingMaintenanceRedundancy = 1;
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   @BeforeEach
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException {
+      AllSourcesOverloadedException, NotLeaderException {
     nodeManager = new MockNodeManager(true, 10) {
       @Override
       public NodeStatus getNodeStatus(DatanodeDetails dd) {
@@ -111,17 +114,11 @@ public class TestECUnderReplicationHandler {
               HddsProtos.NodeState.HEALTHY, 0);
         });
 
-    Mockito.when(replicationManager.createThrottledReplicationCommand(
-            Mockito.anyLong(), Mockito.anyList(),
-            Mockito.any(DatanodeDetails.class), Mockito.anyInt()))
-        .thenAnswer(invocationOnMock -> {
-          List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
-          ReplicateContainerCommand command = ReplicateContainerCommand
-              .toTarget(invocationOnMock.getArgument(0),
-                  invocationOnMock.getArgument(2));
-          command.setReplicaIndex(invocationOnMock.getArgument(3));
-          return Pair.of(sources.get(0), command);
-        });
+    commandsSent = new HashSet<>();
+    ReplicationTestUtil.mockRMSendDatanodeCommand(
+        replicationManager, commandsSent);
+    ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
+        replicationManager, commandsSent);
 
     conf = SCMTestUtils.getConf();
     repConfig = new ECReplicationConfig(DATA, PARITY);
@@ -194,9 +191,9 @@ public class TestECUnderReplicationHandler {
         .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
             Pair.of(IN_SERVICE, 5));
-    Mockito.when(replicationManager.createThrottledReplicationCommand(
-            anyLong(), anyList(), any(), anyInt()))
-        .thenThrow(new AllSourcesOverloadedException("Overloaded"));
+    doThrow(new AllSourcesOverloadedException("Overloaded"))
+        .when(replicationManager).sendThrottledReplicationCommand(
+            any(), anyList(), any(), anyInt());
 
     Assertions.assertThrows(AllSourcesOverloadedException.class, () ->
         testUnderReplicationWithMissingIndexes(
@@ -346,7 +343,7 @@ public class TestECUnderReplicationHandler {
       }
 
       Assert.assertThrows(SCMException.class,
-          () -> ecURH.processAndCreateCommands(availableReplicas,
+          () -> ecURH.processAndSendCommands(availableReplicas,
               Collections.emptyList(), underRep, 2));
 
       // Now adjust replicas so it is also over replicated. This time rather
@@ -360,13 +357,16 @@ public class TestECUnderReplicationHandler {
           createDeleteContainerCommand(container, overRepReplica)));
 
       Mockito.when(replicationManager.processOverReplicatedContainer(
-          underRep)).thenReturn(expectedDelete);
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-          ecURH.processAndCreateCommands(availableReplicas,
+          underRep)).thenAnswer(invocationOnMock -> {
+            commandsSent.addAll(expectedDelete);
+            return expectedDelete.size();
+          });
+      commandsSent.clear();
+      ecURH.processAndSendCommands(availableReplicas,
               Collections.emptyList(), underRep, 2);
       Mockito.verify(replicationManager, times(1))
           .processOverReplicatedContainer(underRep);
-      Assertions.assertEquals(true, expectedDelete.equals(commands));
+      Assertions.assertEquals(true, expectedDelete.equals(commandsSent));
     }
   }
 
@@ -412,18 +412,19 @@ public class TestECUnderReplicationHandler {
         availableReplicas.add(toAdd);
       }
 
-      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-          ecURH.processAndCreateCommands(availableReplicas,
-              Collections.emptyList(), underRep, 2);
+      ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
+          underRep, 2);
 
       Mockito.verify(replicationManager, times(0))
           .processOverReplicatedContainer(underRep);
-      Assertions.assertEquals(1, commands.size());
-      Pair<DatanodeDetails, SCMCommand<?>> pair = commands.iterator().next();
+      Assertions.assertEquals(1, commandsSent.size());
+      Pair<DatanodeDetails, SCMCommand<?>> pair =
+          commandsSent.iterator().next();
       Assertions.assertEquals(newNode, pair.getKey());
       Assertions.assertEquals(StorageContainerDatanodeProtocolProtos
               .SCMCommandProto.Type.reconstructECContainersCommand,
           pair.getValue().getType());
+      commandsSent.clear();
     }
   }
 
@@ -449,7 +450,7 @@ public class TestECUnderReplicationHandler {
             1, true, false, false);
 
     Assert.assertThrows(SCMException.class,
-        () -> ecURH.processAndCreateCommands(availableReplicas,
+        () -> ecURH.processAndSendCommands(availableReplicas,
             Collections.emptyList(), underRep, 1));
 
     // Now adjust replicas so it is also over replicated. This time rather than
@@ -465,13 +466,15 @@ public class TestECUnderReplicationHandler {
         createDeleteContainerCommand(container, overRepReplica)));
 
     Mockito.when(replicationManager.processOverReplicatedContainer(
-        underRep)).thenReturn(expectedDelete);
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-        ecURH.processAndCreateCommands(availableReplicas,
+        underRep)).thenAnswer(invocationOnMock -> {
+          commandsSent.addAll(expectedDelete);
+          return expectedDelete.size();
+        });
+    ecURH.processAndSendCommands(availableReplicas,
             Collections.emptyList(), underRep, 1);
     Mockito.verify(replicationManager, times(1))
         .processOverReplicatedContainer(underRep);
-    Assertions.assertEquals(true, expectedDelete.equals(commands));
+    Assertions.assertEquals(true, expectedDelete.equals(commandsSent));
   }
 
   @Test
@@ -489,6 +492,7 @@ public class TestECUnderReplicationHandler {
     // get created due to the placement policy returning an already used node
     testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
         availableReplicas, 0, 0, sameNodePolicy);
+    commandsSent.clear();
 
     // Now add a decommissioning index - we will not get a replicate command
     // for it, as the placement policy will throw an exception as we catch
@@ -553,10 +557,9 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
         ecPlacementPolicy, conf, replicationManager);
 
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-        handler.processAndCreateCommands(availableReplicas,
-            Collections.emptyList(), result, 1);
-    Assertions.assertEquals(1, commands.size());
+    handler.processAndSendCommands(availableReplicas,
+        Collections.emptyList(), result, 1);
+    Assertions.assertEquals(1, commandsSent.size());
     Mockito.verify(ecPlacementPolicy, times(0))
         .chooseDatanodes(anyList(), Mockito.isNull(), eq(0), anyLong(),
             anyLong());
@@ -602,11 +605,9 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
         ecPlacementPolicy, conf, replicationManager);
 
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-        handler.processAndCreateCommands(availableReplicas, pendingOps, result,
-            1);
-    Assertions.assertEquals(1, commands.size());
-    Assertions.assertNotEquals(dn, commands.iterator().next().getKey());
+    handler.processAndSendCommands(availableReplicas, pendingOps, result, 1);
+    Assertions.assertEquals(1, commandsSent.size());
+    Assertions.assertNotEquals(dn, commandsSent.iterator().next().getKey());
   }
 
   @Test
@@ -666,9 +667,8 @@ public class TestECUnderReplicationHandler {
     Mockito.when(result.isUnrecoverable()).thenReturn(false);
     Mockito.when(result.getContainerInfo()).thenReturn(container);
 
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecURH
-        .processAndCreateCommands(availableReplicas, ImmutableList.of(),
-            result, remainingMaintenanceRedundancy);
+    ecURH.processAndSendCommands(availableReplicas, ImmutableList.of(),
+        result, remainingMaintenanceRedundancy);
     int replicateCommand = 0;
     int reconstructCommand = 0;
     byte[] missingIndexesByteArr = new byte[missingIndexes.size()];
@@ -678,7 +678,7 @@ public class TestECUnderReplicationHandler {
     boolean shouldReconstructCommandExist =
         missingIndexes.size() > 0 && missingIndexes.size() <= repConfig
             .getParity();
-    for (Map.Entry<DatanodeDetails, SCMCommand<?>> dnCommand : commands) {
+    for (Map.Entry<DatanodeDetails, SCMCommand<?>> dnCommand : commandsSent) {
       if (dnCommand.getValue() instanceof ReplicateContainerCommand) {
         replicateCommand++;
       } else if (dnCommand
@@ -698,7 +698,7 @@ public class TestECUnderReplicationHandler {
         replicateCommand);
     Assertions.assertEquals(shouldReconstructCommandExist ? 1 : 0,
         reconstructCommand);
-    return commands;
+    return commandsSent;
   }
 
   private DeleteContainerCommand createDeleteContainerCommand(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index 2800eae4ff..0af6e7008e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -35,10 +35,12 @@ import 
org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.jupiter.api.Assertions;
 import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,9 +67,11 @@ public abstract class TestMisReplicationHandler {
   private ContainerInfo container;
   private OzoneConfiguration conf;
   private ReplicationManager replicationManager;
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   protected void setup(ReplicationConfig repConfig)
-      throws NodeNotFoundException, AllSourcesOverloadedException {
+      throws NodeNotFoundException, AllSourcesOverloadedException,
+      NotLeaderException {
 
     replicationManager = Mockito.mock(ReplicationManager.class);
     Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
@@ -77,17 +81,11 @@ public abstract class TestMisReplicationHandler {
               HddsProtos.NodeState.HEALTHY, 0);
         });
 
-    Mockito.when(replicationManager.createThrottledReplicationCommand(
-            Mockito.anyLong(), Mockito.anyList(),
-            Mockito.any(DatanodeDetails.class), Mockito.anyInt()))
-        .thenAnswer(invocationOnMock -> {
-          List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
-          ReplicateContainerCommand command = ReplicateContainerCommand
-              .toTarget(invocationOnMock.getArgument(0),
-                  invocationOnMock.getArgument(2));
-          command.setReplicaIndex(invocationOnMock.getArgument(3));
-          return Pair.of(sources.get(0), command);
-        });
+    commandsSent = new HashSet<>();
+    ReplicationTestUtil.mockRMSendDatanodeCommand(
+        replicationManager, commandsSent);
+    ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
+        replicationManager, commandsSent);
 
     conf = SCMTestUtils.getConf();
     container = ReplicationTestUtil
@@ -176,11 +174,10 @@ public abstract class TestMisReplicationHandler {
     Map<DatanodeDetails, Integer> copyReplicaIdxMap = copy.stream()
             .collect(Collectors.toMap(ContainerReplica::getDatanodeDetails,
                     ContainerReplica::getReplicaIndex));
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-            misReplicationHandler.processAndCreateCommands(availableReplicas,
+    misReplicationHandler.processAndSendCommands(availableReplicas,
                     pendingOp, result, maintenanceCnt);
-    Assertions.assertEquals(expectedNumberOfNodes, commands.size());
-    for (Pair<DatanodeDetails, SCMCommand<?>> pair : commands) {
+    Assertions.assertEquals(expectedNumberOfNodes, commandsSent.size());
+    for (Pair<DatanodeDetails, SCMCommand<?>> pair : commandsSent) {
       SCMCommand<?> command = pair.getValue();
       Assertions.assertTrue(command.getType() == replicateContainerCommand);
       ReplicateContainerCommand replicateContainerCommand =
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
index af230897af..1020d1f5b5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
@@ -17,26 +17,19 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
 import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
-import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
 
 import static org.mockito.ArgumentMatchers.any;
 
@@ -78,23 +71,17 @@ public class TestOverReplicatedProcessor {
   }
 
   @Test
-  public void testDeleteContainerCommand() throws IOException {
+  public void testSuccessfulRun() throws IOException {
     ContainerInfo container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
     queue.enqueue(new OverReplicatedHealthResult(
         container, 3, false));
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
-    DeleteContainerCommand cmd =
-        new DeleteContainerCommand(container.getContainerID());
-    cmd.setReplicaIndex(5);
-    commands.add(Pair.of(MockDatanodeDetails.randomDatanodeDetails(), cmd));
 
-    Mockito
-        .when(replicationManager.processOverReplicatedContainer(any()))
-        .thenReturn(commands);
+    Mockito.when(replicationManager.processOverReplicatedContainer(any()))
+        .thenReturn(1);
     overReplicatedProcessor.processAll();
-
-    Mockito.verify(replicationManager).sendDatanodeCommand(any(), any(), 
any());
+    Mockito.verify(replicationManager, Mockito.times(0))
+        .requeueOverReplicatedContainer(any());
   }
 
   @Test
@@ -110,8 +97,6 @@ public class TestOverReplicatedProcessor {
         .thenThrow(new AssertionError("Should process only one item"));
     overReplicatedProcessor.processAll();
 
-    Mockito.verify(replicationManager, Mockito.times(0))
-        .sendDatanodeCommand(any(), any(), any());
     Mockito.verify(replicationManager, Mockito.times(1))
         .requeueOverReplicatedContainer(any());
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
index 2fed140c0b..2e1bfadb42 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -46,7 +47,6 @@ import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
 
 /**
  * Tests the RatisReplicationHandling functionality.
@@ -55,7 +55,7 @@ public class TestRatisMisReplicationHandler extends 
TestMisReplicationHandler {
 
   @BeforeEach
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException {
+      AllSourcesOverloadedException, NotLeaderException {
     RatisReplicationConfig repConfig = RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE);
     setup(repConfig);
@@ -177,9 +177,9 @@ public class TestRatisMisReplicationHandler extends 
TestMisReplicationHandler {
   @Test
   public void testAllSourcesOverloaded() throws IOException {
     ReplicationManager replicationManager = getReplicationManager();
-    Mockito.when(replicationManager.createThrottledReplicationCommand(
-            anyLong(), anyList(), any(), anyInt()))
-        .thenThrow(new AllSourcesOverloadedException("Overloaded"));
+    Mockito.doThrow(new AllSourcesOverloadedException("Overloaded"))
+        .when(replicationManager).sendThrottledReplicationCommand(any(),
+            anyList(), any(), anyInt());
 
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
index 25697bab99..0c8ea277af 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
@@ -29,11 +29,11 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,6 +42,7 @@ import org.slf4j.event.Level;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -50,6 +51,7 @@ import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUt
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicasWithSameOrigin;
+import static org.mockito.ArgumentMatchers.any;
 
 /**
  * Tests for {@link RatisOverReplicationHandler}.
@@ -59,10 +61,11 @@ public class TestRatisOverReplicationHandler {
   private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
       RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
   private PlacementPolicy policy;
-  private NodeManager nodeManager;
+  private ReplicationManager replicationManager;
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   @Before
-  public void setup() throws NodeNotFoundException {
+  public void setup() throws NodeNotFoundException, NotLeaderException {
     container = createContainer(HddsProtos.LifeCycleState.CLOSED,
         RATIS_REPLICATION_CONFIG);
 
@@ -71,9 +74,17 @@ public class TestRatisOverReplicationHandler {
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
 
-    nodeManager = Mockito.mock(NodeManager.class);
-    Mockito.when(nodeManager.getNodeStatus(Mockito.any()))
-        .thenReturn(NodeStatus.inServiceHealthy());
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dd = invocation.getArgument(0);
+          return new NodeStatus(dd.getPersistedOpState(),
+              HddsProtos.NodeState.HEALTHY, 0);
+        });
+
+    commandsSent = new HashSet<>();
+    ReplicationTestUtil.mockRMSendDeleteCommand(replicationManager,
+        commandsSent);
 
     GenericTestUtils.setLogLevel(RatisOverReplicationHandler.LOG, Level.DEBUG);
   }
@@ -106,8 +117,9 @@ public class TestRatisOverReplicationHandler {
         ContainerReplicaProto.State.CLOSED, 0, 0, 0, 0);
 
     ContainerReplica stale = replicas.stream().findFirst().get();
-    Mockito.when(nodeManager.getNodeStatus(stale.getDatanodeDetails()))
-        .thenReturn(NodeStatus.inServiceStale());
+    Mockito.when(replicationManager.getNodeStatus(stale.getDatanodeDetails()))
+        .thenAnswer(invocation ->
+            NodeStatus.inServiceStale());
 
     testProcessing(replicas, Collections.emptyList(),
         getOverReplicatedHealthResult(), 0);
@@ -274,14 +286,13 @@ public class TestRatisOverReplicationHandler {
       ContainerHealthResult healthResult,
       int expectNumCommands) throws IOException {
     RatisOverReplicationHandler handler =
-        new RatisOverReplicationHandler(policy, nodeManager);
+        new RatisOverReplicationHandler(policy, replicationManager);
 
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-        handler.processAndCreateCommands(replicas, pendingOps,
+    handler.processAndSendCommands(replicas, pendingOps,
             healthResult, 2);
-    Assert.assertEquals(expectNumCommands, commands.size());
+    Assert.assertEquals(expectNumCommands, commandsSent.size());
 
-    return commands;
+    return commandsSent;
   }
 
   private ContainerHealthResult.OverReplicatedHealthResult
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index 58975d36a6..5c37f5c106 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,6 +44,7 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -64,10 +65,11 @@ public class TestRatisUnderReplicationHandler {
       RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
   private PlacementPolicy policy;
   private ReplicationManager replicationManager;
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
 
   @Before
   public void setup() throws NodeNotFoundException,
-      AllSourcesOverloadedException {
+      AllSourcesOverloadedException, NotLeaderException {
     container = ReplicationTestUtil.createContainer(
         HddsProtos.LifeCycleState.CLOSED, RATIS_REPLICATION_CONFIG);
 
@@ -91,17 +93,11 @@ public class TestRatisUnderReplicationHandler {
               HddsProtos.NodeState.HEALTHY);
         });
 
-    Mockito.when(replicationManager.createThrottledReplicationCommand(
-        Mockito.anyLong(), Mockito.anyList(),
-            Mockito.any(DatanodeDetails.class), Mockito.anyInt()))
-        .thenAnswer(invocationOnMock -> {
-          List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
-          ReplicateContainerCommand command = ReplicateContainerCommand
-              .toTarget(invocationOnMock.getArgument(0),
-              invocationOnMock.getArgument(2));
-          command.setReplicaIndex(invocationOnMock.getArgument(3));
-          return Pair.of(sources.get(0), command);
-        });
+    commandsSent = new HashSet<>();
+    ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
+        replicationManager, commandsSent);
+    ReplicationTestUtil.mockRMSendDatanodeCommand(replicationManager,
+        commandsSent);
   }
 
   /**
@@ -209,7 +205,7 @@ public class TestRatisUnderReplicationHandler {
         = createReplicas(container.containerID(), State.CLOSED, 0, 0);
 
     Assert.assertThrows(IOException.class,
-        () -> handler.processAndCreateCommands(replicas,
+        () -> handler.processAndSendCommands(replicas,
             Collections.emptyList(), getUnderReplicatedHealthResult(), 2));
   }
 
@@ -265,11 +261,10 @@ public class TestRatisUnderReplicationHandler {
     RatisUnderReplicationHandler handler =
         new RatisUnderReplicationHandler(policy, conf, replicationManager);
 
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-        handler.processAndCreateCommands(replicas, pendingOps,
+    handler.processAndSendCommands(replicas, pendingOps,
             healthResult, minHealthyForMaintenance);
-    Assert.assertEquals(expectNumCommands, commands.size());
-    return commands;
+    Assert.assertEquals(expectNumCommands, commandsSent.size());
+    return commandsSent;
   }
 
   private UnderReplicatedHealthResult getUnderReplicatedHealthResult() {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 5e3838abf4..aa5d78f09e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -105,6 +105,7 @@ public class TestReplicationManager {
   private ReplicationConfig repConfig;
   private ReplicationManagerReport repReport;
   private ReplicationQueue repQueue;
+  private Set<Pair<UUID, SCMCommand<?>>> commandsSent;
 
   @Before
   public void setup() throws IOException {
@@ -118,7 +119,15 @@ public class TestReplicationManager {
     Mockito.when(ecPlacementPolicy.validateContainerPlacement(
         anyList(), anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
+    commandsSent = new HashSet<>();
     eventPublisher = Mockito.mock(EventPublisher.class);
+    Mockito.doAnswer(invocation -> {
+      CommandForDatanode<?> command = invocation.getArgument(1);
+      commandsSent.add(Pair.of(command.getDatanodeId(), command.getCommand()));
+      return null;
+    }).when(eventPublisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), any());
+
+
     scmContext = Mockito.mock(SCMContext.class);
     nodeManager = Mockito.mock(NodeManager.class);
     legacyReplicationManager = Mockito.mock(LegacyReplicationManager.class);
@@ -313,19 +322,19 @@ public class TestReplicationManager {
     Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
     Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
 
-    RatisOverReplicationHandler handler =
-        new RatisOverReplicationHandler(ratisPlacementPolicy, nodeManager);
+    RatisOverReplicationHandler handler = new RatisOverReplicationHandler(
+        ratisPlacementPolicy, replicationManager);
 
     Mockito.when(nodeManager.getNodeStatus(any(DatanodeDetails.class)))
         .thenReturn(NodeStatus.inServiceHealthy());
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-        handler.processAndCreateCommands(replicas, Collections.emptyList(),
+    handler.processAndSendCommands(replicas, Collections.emptyList(),
             repQueue.dequeueOverReplicatedContainer(), 2);
-    Assert.assertTrue(commands.iterator().hasNext());
-    Assert.assertEquals(unhealthy.getDatanodeDetails(),
-        commands.iterator().next().getKey());
+    Assert.assertTrue(commandsSent.iterator().hasNext());
+    Assert.assertEquals(unhealthy.getDatanodeDetails().getUuid(),
+        commandsSent.iterator().next().getKey());
     Assert.assertEquals(SCMCommandProto.Type.deleteContainerCommand,
-        commands.iterator().next().getValue().getType());
+        commandsSent.iterator().next().getValue().getType());
+
   }
 
   @Test
@@ -354,19 +363,18 @@ public class TestReplicationManager {
     Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
     Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
 
-    RatisOverReplicationHandler handler =
-        new RatisOverReplicationHandler(ratisPlacementPolicy, nodeManager);
+    RatisOverReplicationHandler handler = new RatisOverReplicationHandler(
+        ratisPlacementPolicy, replicationManager);
 
     Mockito.when(nodeManager.getNodeStatus(any(DatanodeDetails.class)))
         .thenReturn(NodeStatus.inServiceHealthy());
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
-        handler.processAndCreateCommands(replicas, Collections.emptyList(),
-            repQueue.dequeueOverReplicatedContainer(), 2);
-    Assert.assertTrue(commands.iterator().hasNext());
-    Assert.assertNotEquals(unhealthy.getDatanodeDetails(),
-        commands.iterator().next().getKey());
+    handler.processAndSendCommands(replicas, Collections.emptyList(),
+        repQueue.dequeueOverReplicatedContainer(), 2);
+    Assert.assertTrue(commandsSent.iterator().hasNext());
+    Assert.assertNotEquals(unhealthy.getDatanodeDetails().getUuid(),
+        commandsSent.iterator().next().getKey());
     Assert.assertEquals(SCMCommandProto.Type.deleteContainerCommand,
-        commands.iterator().next().getValue().getType());
+        commandsSent.iterator().next().getValue().getType());
   }
 
   @Test
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
index 91a8c62471..84b4e2414e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -17,29 +17,19 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
 import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
-import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 
 import static org.mockito.ArgumentMatchers.any;
 
@@ -83,62 +73,16 @@ public class TestUnderReplicatedProcessor {
   }
 
   @Test
-  public void testEcReconstructionCommand() throws IOException {
-    ContainerInfo container = ReplicationTestUtil
-        .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
-    queue.enqueue(new UnderReplicatedHealthResult(
-        container, 3, false, false, false));
-    List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
-        sourceNodes = new ArrayList<>();
-    for (int i = 1; i <= 3; i++) {
-      sourceNodes.add(
-          new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(
-              MockDatanodeDetails.randomDatanodeDetails(), i));
-    }
-    List<DatanodeDetails> targetNodes = new ArrayList<>();
-    targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
-    targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
-    byte[] missingIndexes = {4, 5};
-
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
-    commands.add(Pair.of(MockDatanodeDetails.randomDatanodeDetails(),
-        new ReconstructECContainersCommand(container.getContainerID(),
-            sourceNodes, targetNodes, missingIndexes, repConfig)));
-
-    Mockito.when(replicationManager
-            .processUnderReplicatedContainer(any()))
-        .thenReturn(commands);
-    underReplicatedProcessor.processAll();
-
-    Mockito.verify(replicationManager, Mockito.times(1))
-        .sendDatanodeCommand(any(), any(), any());
-    Mockito.verify(replicationManager, Mockito.times(0))
-        .requeueUnderReplicatedContainer(any());
-  }
-
-  @Test
-  public void testEcReplicationCommand() throws IOException {
+  public void testSuccessfulCommand() throws IOException {
     ContainerInfo container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
     queue.enqueue(new UnderReplicatedHealthResult(
         container, 3, true, false, false));
-    List<DatanodeDetails> sourceDns = new ArrayList<>();
-    sourceDns.add(MockDatanodeDetails.randomDatanodeDetails());
-    DatanodeDetails targetDn = MockDatanodeDetails.randomDatanodeDetails();
-    ReplicateContainerCommand rcc = ReplicateContainerCommand.fromSources(
-        container.getContainerID(), sourceDns);
-    rcc.setReplicaIndex(3);
-
-    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
-    commands.add(Pair.of(targetDn, rcc));
-
     Mockito.when(replicationManager
             .processUnderReplicatedContainer(any()))
-        .thenReturn(commands);
+        .thenReturn(1);
     underReplicatedProcessor.processAll();
 
-    Mockito.verify(replicationManager, Mockito.times(1))
-        .sendDatanodeCommand(any(), any(), any());
     Mockito.verify(replicationManager, Mockito.times(0))
         .requeueUnderReplicatedContainer(any());
   }
@@ -156,8 +100,6 @@ public class TestUnderReplicatedProcessor {
         .thenThrow(new AssertionError("Should process only one item"));
     underReplicatedProcessor.processAll();
 
-    Mockito.verify(replicationManager, Mockito.times(0))
-        .sendDatanodeCommand(any(), any(), any());
     Mockito.verify(replicationManager, Mockito.times(1))
         .requeueUnderReplicatedContainer(any());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to