This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d8521060 HDDS-7822. Allow multiple commands per datanode in 
UnhealthyReplicationHandler (#4203)
08d8521060 is described below

commit 08d8521060612a9a8ed7ae9aa4dabbd387cc4920
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed Jan 25 10:20:31 2023 +0100

    HDDS-7822. Allow multiple commands per datanode in 
UnhealthyReplicationHandler (#4203)
---
 .../AbstractOverReplicationHandler.java            | 20 --------
 .../replication/ECOverReplicationHandler.java      | 18 ++++---
 .../replication/ECUnderReplicationHandler.java     | 24 ++++-----
 .../replication/MisReplicationHandler.java         | 18 +++----
 .../replication/OverReplicatedProcessor.java       |  5 +-
 .../replication/RatisOverReplicationHandler.java   | 20 ++++----
 .../replication/RatisUnderReplicationHandler.java  | 28 +++++------
 .../container/replication/ReplicationManager.java  |  5 +-
 .../replication/UnderReplicatedProcessor.java      |  5 +-
 .../replication/UnhealthyReplicationHandler.java   |  5 +-
 .../replication/UnhealthyReplicationProcessor.java | 13 ++---
 .../replication/TestECOverReplicationHandler.java  | 17 +++----
 .../replication/TestECUnderReplicationHandler.java | 58 +++++++++++-----------
 .../replication/TestMisReplicationHandler.java     | 14 +++---
 .../replication/TestOverReplicatedProcessor.java   |  9 ++--
 .../TestRatisOverReplicationHandler.java           | 25 ++++++----
 .../TestRatisUnderReplicationHandler.java          |  3 +-
 .../replication/TestUnderReplicatedProcessor.java  | 15 +++---
 18 files changed, 146 insertions(+), 156 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
index 5d647593ef..9dead0feab 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
@@ -22,11 +22,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -42,23 +39,6 @@ public abstract class AbstractOverReplicationHandler
     this.placementPolicy = placementPolicy;
   }
 
-  /**
-   * Identify a new set of datanode(s) to delete the container
-   * and form the SCM commands to send it to DN.
-   *
-   * @param replicas - Set of available container replicas.
-   * @param pendingOps - Inflight replications and deletion ops.
-   * @param result - Health check result.
-   * @param remainingMaintenanceRedundancy - represents that how many nodes go
-   *                                      into maintenance.
-   * @return Returns the key value pair of destination dn where the command 
gets
-   * executed and the command itself.
-   */
-  public abstract Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
-      Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
-      ContainerHealthResult result, int remainingMaintenanceRedundancy) throws
-      IOException;
-
   /**
    * Identify whether the placement status is actually equal for a
    * replica set after removing those filtered replicas.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 00c1a264af..4bd49144d6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -32,12 +33,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
 
 /**
  * Handles the EC Over replication processing and forming the respective SCM
@@ -69,7 +71,7 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
    * executed and the command itself.
    */
   @Override
-  public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int remainingMaintenanceRedundancy) {
     ContainerInfo container = result.getContainerInfo();
@@ -102,13 +104,13 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       LOG.info("The container {} state changed and it is no longer over"
               + " replication. Replica count: {}, healthy replica count: {}",
           container.getContainerID(), replicas.size(), healthyReplicas.size());
-      return emptyMap();
+      return emptySet();
     }
 
     if (!replicaCount.isOverReplicated(true)) {
       LOG.info("The container {} with replicas {} will be corrected " +
           "by the pending delete", container.getContainerID(), replicas);
-      return emptyMap();
+      return emptySet();
     }
 
     List<Integer> overReplicatedIndexes =
@@ -118,7 +120,7 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       LOG.warn("The container {} with replicas {} was found over replicated "
           + "by EcContainerReplicaCount, but there are no over replicated "
           + "indexes returned", container.getContainerID(), replicas);
-      return emptyMap();
+      return emptySet();
     }
 
     final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
@@ -141,10 +143,10 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       LOG.warn("The container {} is over replicated, but no replicas were "
           + "selected to remove by the placement policy. Replicas: {}",
           container, replicas);
-      return emptyMap();
+      return emptySet();
     }
 
-    final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
     // As a sanity check, sum up the current counts of each replica index. When
     // processing replicasToRemove, ensure that removing the replica would not
     // drop the count of that index to zero.
@@ -165,7 +167,7 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       DeleteContainerCommand deleteCommand =
           new DeleteContainerCommand(container.getContainerID(), true);
       deleteCommand.setReplicaIndex(r.getReplicaIndex());
-      commands.put(r.getDatanodeDetails(), deleteCommand);
+      commands.add(Pair.of(r.getDatanodeDetails(), deleteCommand));
     }
 
     if (commands.size() == 0) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 49b29bd46c..256ec44624 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -41,14 +41,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 
 /**
@@ -114,7 +114,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
    * should be retried later.
    */
   @Override
-  public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
       final Set<ContainerReplica> replicas,
       final List<ContainerReplicaOp> pendingOps,
       final ContainerHealthResult result,
@@ -128,13 +128,13 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     if (replicaCount.isSufficientlyReplicated()) {
       LOG.info("The container {} state changed and it's not in under"
               + " replication any more.", container.getContainerID());
-      return emptyMap();
+      return emptySet();
     }
     if (replicaCount.isSufficientlyReplicated(true)) {
       LOG.info("The container {} with replicas {} will be sufficiently " +
           "replicated after pending replicas are created",
           container.getContainerID(), replicaCount.getReplicas());
-      return emptyMap();
+      return emptySet();
     }
 
     // don't place reconstructed replicas on exclude nodes, since they already
@@ -151,7 +151,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         .collect(Collectors.toList()));
 
     final ContainerID id = container.containerID();
-    final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
     try {
       final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
       for (ContainerReplicaOp op : pendingOps) {
@@ -282,7 +282,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       Pair<ContainerReplica, NodeStatus>> sources,
       List<DatanodeDetails> availableSourceNodes,
       List<DatanodeDetails> excludedNodes,
-      Map<DatanodeDetails, SCMCommand<?>> commands) throws IOException {
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
     ContainerInfo container = replicaCount.getContainer();
     ECReplicationConfig repConfig =
         (ECReplicationConfig)container.getReplicationConfig();
@@ -316,7 +316,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
                 int2byte(missingIndexes),
                 repConfig);
         // Keeping the first target node as coordinator.
-        commands.put(selectedDatanodes.get(0), reconstructionCommand);
+        commands.add(Pair.of(selectedDatanodes.get(0), reconstructionCommand));
       }
     } else {
       LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
@@ -337,7 +337,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       Set<ContainerReplica> replicas,
       List<DatanodeDetails> availableSourceNodes,
       List<DatanodeDetails> excludedNodes,
-      Map<DatanodeDetails, SCMCommand<?>> commands) throws IOException {
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
     ContainerInfo container = replicaCount.getContainer();
     Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
     if (decomIndexes.size() > 0) {
@@ -376,7 +376,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   private void processMaintenanceOnlyIndexes(
       ECContainerReplicaCount replicaCount, Set<ContainerReplica> replicas,
       List<DatanodeDetails> excludedNodes,
-      Map<DatanodeDetails, SCMCommand<?>> commands) throws IOException {
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands) throws IOException {
     Set<Integer> maintIndexes = replicaCount.maintenanceOnlyIndexes(true);
     if (maintIndexes.isEmpty()) {
       return;
@@ -412,7 +412,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   }
 
   private void createReplicateCommand(
-      Map<DatanodeDetails, SCMCommand<?>> commands,
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands,
       ContainerInfo container, Iterator<DatanodeDetails> iterator,
       ContainerReplica replica) {
     final boolean push = replicationManager.getConfig().isPush();
@@ -426,7 +426,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     // For EC containers, we need to track the replica index which is
     // to be replicated, so add it to the command.
     replicateCommand.setReplicaIndex(replica.getReplicaIndex());
-    commands.put(push ? source : target, replicateCommand);
+    commands.add(Pair.of(push ? source : target, replicateCommand));
   }
 
   private static byte[] int2byte(List<Integer> src) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 3570f45eb7..10dfede065 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -114,11 +114,11 @@ public abstract class MisReplicationHandler implements
   protected abstract ReplicateContainerCommand updateReplicateCommand(
           ReplicateContainerCommand command, ContainerReplica replica);
 
-  private Map<DatanodeDetails, SCMCommand<?>> getReplicateCommands(
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> getReplicateCommands(
           ContainerInfo containerInfo,
           Set<ContainerReplica> replicasToBeReplicated,
           List<DatanodeDetails> targetDns) {
-    Map<DatanodeDetails, SCMCommand<?>> commandMap = Maps.newHashMap();
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commandMap = new HashSet<>();
     int datanodeIdx = 0;
     for (ContainerReplica replica : replicasToBeReplicated) {
       if (datanodeIdx == targetDns.size()) {
@@ -132,14 +132,14 @@ public abstract class MisReplicationHandler implements
           : ReplicateContainerCommand.fromSources(containerID,
               Collections.singletonList(source));
       replicateCommand = updateReplicateCommand(replicateCommand, replica);
-      commandMap.put(push ? source : target, replicateCommand);
+      commandMap.add(Pair.of(push ? source : target, replicateCommand));
       datanodeIdx += 1;
     }
     return commandMap;
 
   }
   @Override
-  public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int remainingMaintenanceRedundancy)
       throws IOException {
@@ -148,7 +148,7 @@ public abstract class MisReplicationHandler implements
       LOG.info("Skipping Mis-Replication for Container {}, " +
                "as there are still some pending ops for the container: {}",
               container, pendingOps);
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
     ContainerReplicaCount replicaCount = getContainerReplicaCount(container,
             replicas, Collections.emptyList(), remainingMaintenanceRedundancy);
@@ -162,7 +162,7 @@ public abstract class MisReplicationHandler implements
               container.getContainerID(),
               !replicaCount.isSufficientlyReplicated(),
               replicaCount.isOverReplicated());
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
 
     List<DatanodeDetails> usedDns = replicas.stream()
@@ -172,7 +172,7 @@ public abstract class MisReplicationHandler implements
             usedDns.size()).isPolicySatisfied()) {
       LOG.info("Container {} is currently not misreplicated",
               container.getContainerID());
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
 
     Set<ContainerReplica> sources = filterSources(replicas);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
index 93f536a644..4f7806487c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java
@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * Class used to pick messages from the ReplicationManager over replicated
@@ -50,7 +51,7 @@ public class OverReplicatedProcessor extends 
UnhealthyReplicationProcessor
     replicationManager.requeueOverReplicatedContainer(healthResult);
   }
   @Override
-  protected Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
+  protected Set<Pair<DatanodeDetails, SCMCommand<?>>> getDatanodeCommands(
       ReplicationManager replicationManager,
       ContainerHealthResult.OverReplicatedHealthResult healthResult)
       throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
index d3bef7d9e2..856027a5af 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -34,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -77,7 +77,7 @@ public class RatisOverReplicationHandler
    * delete replicas on those datanodes.
    */
   @Override
-  public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int minHealthyForMaintenance) throws
       IOException {
@@ -115,7 +115,7 @@ public class RatisOverReplicationHandler
 
     // verify that this container is actually over replicated
     if (!verifyOverReplication(replicaCount)) {
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
 
     // get number of excess replicas
@@ -132,7 +132,7 @@ public class RatisOverReplicationHandler
     if (eligibleReplicas.size() == 0) {
       LOG.info("Did not find any replicas that are eligible to be deleted for" 
+
           " container {}.", containerInfo);
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
 
     return createCommands(containerInfo, eligibleReplicas, excess);
@@ -214,10 +214,10 @@ public class RatisOverReplicationHandler
         .collect(Collectors.toList());
   }
 
-  private Map<DatanodeDetails, SCMCommand<?>> createCommands(
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> createCommands(
       ContainerInfo containerInfo, List<ContainerReplica> replicas,
       int excess) {
-    Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
 
     /*
     Over replication means we have enough healthy replicas, so unhealthy
@@ -231,8 +231,8 @@ public class RatisOverReplicationHandler
       }
       if (!ReplicationManager.compareState(
           containerInfo.getState(), replica.getState())) {
-        commands.put(replica.getDatanodeDetails(),
-            createDeleteCommand(containerInfo));
+        commands.add(Pair.of(replica.getDatanodeDetails(),
+            createDeleteCommand(containerInfo)));
         unhealthyReplicas.add(replica);
         excess--;
       }
@@ -253,8 +253,8 @@ public class RatisOverReplicationHandler
 
       if (super.isPlacementStatusActuallyEqualAfterRemove(replicaSet, replica,
           containerInfo.getReplicationFactor().getNumber())) {
-        commands.put(replica.getDatanodeDetails(),
-            createDeleteCommand(containerInfo));
+        commands.add(Pair.of(replica.getDatanodeDetails(),
+            createDeleteCommand(containerInfo)));
         excess--;
       }
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index d9d96a566c..6d5a2d266b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.collections.iterators.LoopingIterator;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -34,11 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -77,7 +77,6 @@ public class RatisUnderReplicationHandler
    * @param result Health check result indicating under replication.
    * @param minHealthyForMaintenance Number of healthy replicas that must be
    *                                 available for a DN to enter maintenance
-   *
    * @return Returns the key value pair of destination dn where the command 
gets
    * executed and the command itself. If an empty map is returned, it indicates
    * the container is no longer unhealthy and can be removed from the unhealthy
@@ -85,7 +84,7 @@ public class RatisUnderReplicationHandler
    * should be retried later.
    */
   @Override
-  public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+  public Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int minHealthyForMaintenance)
       throws IOException {
@@ -109,7 +108,7 @@ public class RatisUnderReplicationHandler
     // verify that this container is still under replicated and we don't have
     // sufficient replication after considering pending adds
     if (!verifyUnderReplication(replicaCount)) {
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
 
     // find sources that can provide replicas
@@ -118,7 +117,7 @@ public class RatisUnderReplicationHandler
     if (sourceDatanodes.isEmpty()) {
       LOG.warn("Cannot replicate container {} because no healthy replicas " +
           "were found.", containerInfo);
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
 
     // find targets to send replicas to
@@ -127,7 +126,7 @@ public class RatisUnderReplicationHandler
     if (targetDatanodes.isEmpty()) {
       LOG.warn("Cannot replicate container {} because no eligible targets " +
           "were found.", containerInfo);
-      return Collections.emptyMap();
+      return Collections.emptySet();
     }
 
     return createReplicationCommands(containerInfo.getContainerID(),
@@ -231,31 +230,28 @@ public class RatisUnderReplicationHandler
         replicaCount.additionalReplicaNeeded(), 0, dataSizeRequired);
   }
 
-  private Map<DatanodeDetails, SCMCommand<?>> createReplicationCommands(
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> createReplicationCommands(
       long containerID, List<DatanodeDetails> sources,
       List<DatanodeDetails> targets) {
-    final boolean push = replicationManager.getConfig().isPush()
-        && targets.size() <= sources.size();
-    // TODO if we need multiple commands per source datanode, we need a small
-    // interface change
-    Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    final boolean push = replicationManager.getConfig().isPush();
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
 
     if (push) {
       Collections.shuffle(sources);
-      for (Iterator<DatanodeDetails> srcIter = sources.iterator(),
+      for (Iterator<DatanodeDetails> srcIter = new LoopingIterator(sources),
               targetIter = targets.iterator();
           srcIter.hasNext() && targetIter.hasNext();) {
         DatanodeDetails source = srcIter.next();
         DatanodeDetails target = targetIter.next();
         ReplicateContainerCommand command =
             ReplicateContainerCommand.toTarget(containerID, target);
-        commands.put(source, command);
+        commands.add(Pair.of(source, command));
       }
     } else {
       for (DatanodeDetails target : targets) {
         ReplicateContainerCommand command =
             ReplicateContainerCommand.fromSources(containerID, sources);
-        commands.put(target, command);
+        commands.add(Pair.of(target, command));
       }
     }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 911058f250..eee68e8623 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -522,7 +523,7 @@ public class ReplicationManager implements SCMService {
     }
   }
 
-  public Map<DatanodeDetails, SCMCommand<?>> processUnderReplicatedContainer(
+  Set<Pair<DatanodeDetails, SCMCommand<?>>> processUnderReplicatedContainer(
       final ContainerHealthResult result) throws IOException {
     ContainerID containerID = result.getContainerInfo().containerID();
     Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -547,7 +548,7 @@ public class ReplicationManager implements SCMService {
         pendingOps, result, ratisMaintenanceMinReplicas);
   }
 
-  public Map<DatanodeDetails, SCMCommand<?>> processOverReplicatedContainer(
+  Set<Pair<DatanodeDetails, SCMCommand<?>>> processOverReplicatedContainer(
       final ContainerHealthResult result) throws IOException {
     ContainerID containerID = result.getContainerInfo().containerID();
     Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
index 429c0e14eb..10d3d2d1ef 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * Class used to pick messages from the ReplicationManager under replicated
@@ -50,7 +51,7 @@ public class UnderReplicatedProcessor extends 
UnhealthyReplicationProcessor
   }
 
   @Override
-  protected Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
+  protected Set<Pair<DatanodeDetails, SCMCommand<?>>> getDatanodeCommands(
           ReplicationManager replicationManager,
           ContainerHealthResult.UnderReplicatedHealthResult healthResult)
           throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
index a6d37c40f6..fe73756e39 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -47,8 +47,9 @@ public interface UnhealthyReplicationHandler {
    * queue. Any exception indicates that the container is still unhealthy and
    * should be retried later.
    */
-  Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
+  Set<Pair<DatanodeDetails, SCMCommand<?>>> processAndCreateCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult result, int remainingMaintenanceRedundancy)
       throws IOException;
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index 9623222f05..1289981c51 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Class used to pick messages from the respective ReplicationManager
@@ -107,17 +109,16 @@ public abstract class 
UnhealthyReplicationProcessor<HealthResult extends
   /**
    * Gets the commands to be run datanode to process the
    * container health result.
-   * @param rm
-   * @param healthResult
    * @return Commands to be run on Datanodes
    */
-  protected abstract Map<DatanodeDetails, SCMCommand<?>> getDatanodeCommands(
-          ReplicationManager rm, HealthResult healthResult)
+  protected abstract Set<Pair<DatanodeDetails, SCMCommand<?>>>
+      getDatanodeCommands(ReplicationManager rm, HealthResult healthResult)
           throws IOException;
+
   private void processContainer(HealthResult healthResult) throws IOException {
-    Map<DatanodeDetails, SCMCommand<?>> cmds = getDatanodeCommands(
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds = getDatanodeCommands(
             replicationManager, healthResult);
-    for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds.entrySet()) {
+    for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds) {
       replicationManager.sendDatanodeCommand(cmd.getValue(),
               healthResult.getContainerInfo(), cmd.getKey());
     }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index e882374a09..4542dd930a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -237,14 +237,13 @@ public class TestECOverReplicationHandler {
     ECOverReplicationHandler ecORH =
         new ECOverReplicationHandler(policy, nodeManager);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands = ecORH
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecORH
         .processAndCreateCommands(availableReplicas, ImmutableList.of(),
             health, 1);
 
     Assert.assertEquals(1, commands.size());
-    for (SCMCommand<?> cmd : commands.values()) {
-      Assert.assertEquals(1, ((DeleteContainerCommand)cmd).getReplicaIndex());
-    }
+    SCMCommand<?> cmd = commands.iterator().next().getValue();
+    Assert.assertEquals(1, ((DeleteContainerCommand)cmd).getReplicaIndex());
   }
 
   private void testOverReplicationWithIndexes(
@@ -257,7 +256,7 @@ public class TestECOverReplicationHandler {
         Mockito.mock(ContainerHealthResult.OverReplicatedHealthResult.class);
     Mockito.when(result.getContainerInfo()).thenReturn(container);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands = ecORH
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecORH
         .processAndCreateCommands(availableReplicas, pendingOps,
             result, 1);
 
@@ -268,8 +267,8 @@ public class TestECOverReplicationHandler {
     Assert.assertEquals(totalDeleteCommandNum, commands.size());
 
     // Each command should have a non-zero replica index
-    commands.forEach((datanode, command) -> Assert.assertNotEquals(0,
-        ((DeleteContainerCommand)command).getReplicaIndex()));
+    commands.forEach(pair -> Assert.assertNotEquals(0,
+        ((DeleteContainerCommand) pair.getValue()).getReplicaIndex()));
 
     // command num of each index should be equal to the excess num
     // of this index
@@ -278,8 +277,8 @@ public class TestECOverReplicationHandler {
             ContainerReplica::getDatanodeDetails,
             ContainerReplica::getReplicaIndex));
     Map<Integer, Integer> index2commandNum = new HashMap<>();
-    commands.keySet().forEach(dd ->
-        index2commandNum.merge(datanodeDetails2Index.get(dd), 1, Integer::sum)
+    commands.forEach(pair -> index2commandNum.merge(
+        datanodeDetails2Index.get(pair.getKey()), 1, Integer::sum)
     );
 
     index2commandNum.keySet().forEach(i -> {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 28961646e6..152f4927a7 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -51,7 +51,6 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -155,13 +154,13 @@ public class TestECUnderReplicationHandler {
         .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
             Pair.of(IN_SERVICE, 5));
-    Map<DatanodeDetails, SCMCommand<?>> cmds =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
         testUnderReplicationWithMissingIndexes(
             Lists.emptyList(), availableReplicas, 1, 0, policy);
     Assertions.assertEquals(1, cmds.size());
     // Check the replicate command has index 1 set
-    ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds.values()
-        .iterator().next();
+    ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds
+        .iterator().next().getValue();
     Assertions.assertEquals(1, cmd.getReplicaIndex());
 
   }
@@ -317,13 +316,14 @@ public class TestECUnderReplicationHandler {
       // returns, which in this case is a delete command for replica index 4.
       availableReplicas.add(overRepReplica);
 
-      Map<DatanodeDetails, SCMCommand<?>> expectedDelete = new HashMap<>();
-      expectedDelete.put(overRepReplica.getDatanodeDetails(),
-          createDeleteContainerCommand(container, overRepReplica));
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> expectedDelete =
+          new HashSet<>();
+      expectedDelete.add(Pair.of(overRepReplica.getDatanodeDetails(),
+          createDeleteContainerCommand(container, overRepReplica)));
 
       Mockito.when(replicationManager.processOverReplicatedContainer(
           underRep)).thenReturn(expectedDelete);
-      Map<DatanodeDetails, SCMCommand<?>> commands =
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
           ecURH.processAndCreateCommands(availableReplicas,
               Collections.emptyList(), underRep, 2);
       Mockito.verify(replicationManager, times(1))
@@ -374,16 +374,18 @@ public class TestECUnderReplicationHandler {
         availableReplicas.add(toAdd);
       }
 
-      Map<DatanodeDetails, SCMCommand<?>> commands =
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
           ecURH.processAndCreateCommands(availableReplicas,
               Collections.emptyList(), underRep, 2);
 
       Mockito.verify(replicationManager, times(0))
           .processOverReplicatedContainer(underRep);
       Assertions.assertEquals(1, commands.size());
+      Pair<DatanodeDetails, SCMCommand<?>> pair = commands.iterator().next();
+      Assertions.assertEquals(newNode, pair.getKey());
       Assertions.assertEquals(StorageContainerDatanodeProtocolProtos
               .SCMCommandProto.Type.reconstructECContainersCommand,
-          commands.get(newNode).getType());
+          pair.getValue().getType());
     }
   }
 
@@ -420,13 +422,13 @@ public class TestECUnderReplicationHandler {
             4, IN_SERVICE, CLOSED);
     availableReplicas.add(overRepReplica);
 
-    Map<DatanodeDetails, SCMCommand<?>> expectedDelete = new HashMap<>();
-    expectedDelete.put(overRepReplica.getDatanodeDetails(),
-        createDeleteContainerCommand(container, overRepReplica));
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> expectedDelete = new HashSet<>();
+    expectedDelete.add(Pair.of(overRepReplica.getDatanodeDetails(),
+        createDeleteContainerCommand(container, overRepReplica)));
 
     Mockito.when(replicationManager.processOverReplicatedContainer(
         underRep)).thenReturn(expectedDelete);
-    Map<DatanodeDetails, SCMCommand<?>> commands =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
         ecURH.processAndCreateCommands(availableReplicas,
             Collections.emptyList(), underRep, 1);
     Mockito.verify(replicationManager, times(1))
@@ -469,12 +471,12 @@ public class TestECUnderReplicationHandler {
         .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 1),
             Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 1),
             Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
-    Map<DatanodeDetails, SCMCommand<?>> cmds =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
         testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3),
             availableReplicas, 0, 0, policy);
     Assertions.assertEquals(1, cmds.size());
     ReconstructECContainersCommand cmd =
-        (ReconstructECContainersCommand) cmds.values().iterator().next();
+        (ReconstructECContainersCommand) cmds.iterator().next().getValue();
     // Ensure that all source nodes are IN_SERVICE, we should not have picked
     // the non in-service nodes for index 1.
     for (ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex s
@@ -513,7 +515,7 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
         ecPlacementPolicy, conf, nodeManager, replicationManager);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
         handler.processAndCreateCommands(availableReplicas,
             Collections.emptyList(), result, 1);
     Assertions.assertEquals(1, commands.size());
@@ -562,11 +564,11 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
         ecPlacementPolicy, conf, nodeManager, replicationManager);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
         handler.processAndCreateCommands(availableReplicas, pendingOps, result,
             1);
     Assertions.assertEquals(1, commands.size());
-    Assertions.assertFalse(commands.containsKey(dn));
+    Assertions.assertNotEquals(dn, commands.iterator().next().getKey());
   }
 
   @Test
@@ -577,12 +579,12 @@ public class TestECUnderReplicationHandler {
     ContainerReplica decomReplica = ReplicationTestUtil.createContainerReplica(
         container.containerID(), 2, DECOMMISSIONING, CLOSED);
     availableReplicas.add(decomReplica);
-    Map<DatanodeDetails, SCMCommand<?>> cmds =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
         testUnderReplicationWithMissingIndexes(Collections.emptyList(),
             availableReplicas, 1, 0, policy);
     Assertions.assertEquals(1, cmds.size());
     ReplicateContainerCommand cmd =
-        (ReplicateContainerCommand) cmds.values().iterator().next();
+        (ReplicateContainerCommand) cmds.iterator().next().getValue();
 
     List<DatanodeDetails> sources = cmd.getSourceDatanodes();
     Assertions.assertEquals(1, sources.size());
@@ -599,7 +601,7 @@ public class TestECUnderReplicationHandler {
         container.containerID(), 2, ENTERING_MAINTENANCE, CLOSED);
     availableReplicas.add(maintReplica);
 
-    Map<DatanodeDetails, SCMCommand<?>> cmds =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
         testUnderReplicationWithMissingIndexes(Collections.emptyList(),
             availableReplicas, 0, 1, policy);
     Assertions.assertEquals(0, cmds.size());
@@ -611,7 +613,7 @@ public class TestECUnderReplicationHandler {
 
     Assertions.assertEquals(1, cmds.size());
     ReplicateContainerCommand cmd =
-        (ReplicateContainerCommand) cmds.values().iterator().next();
+        (ReplicateContainerCommand) cmds.iterator().next().getValue();
 
     List<DatanodeDetails> sources = cmd.getSourceDatanodes();
     Assertions.assertEquals(1, sources.size());
@@ -619,7 +621,7 @@ public class TestECUnderReplicationHandler {
         cmd.getSourceDatanodes().get(0));
   }
 
-  public Map<DatanodeDetails, SCMCommand<?>>
+  public Set<Pair<DatanodeDetails, SCMCommand<?>>>
       testUnderReplicationWithMissingIndexes(
       List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,
       int decomIndexes, int maintenanceIndexes,
@@ -632,11 +634,9 @@ public class TestECUnderReplicationHandler {
     Mockito.when(result.isUnrecoverable()).thenReturn(false);
     Mockito.when(result.getContainerInfo()).thenReturn(container);
 
-    Map<DatanodeDetails, SCMCommand<?>> datanodeDetailsSCMCommandMap = ecURH
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = ecURH
         .processAndCreateCommands(availableReplicas, ImmutableList.of(),
             result, remainingMaintenanceRedundancy);
-    Set<Map.Entry<DatanodeDetails, SCMCommand<?>>> entries =
-        datanodeDetailsSCMCommandMap.entrySet();
     int replicateCommand = 0;
     int reconstructCommand = 0;
     byte[] missingIndexesByteArr = new byte[missingIndexes.size()];
@@ -646,7 +646,7 @@ public class TestECUnderReplicationHandler {
     boolean shouldReconstructCommandExist =
         missingIndexes.size() > 0 && missingIndexes.size() <= repConfig
             .getParity();
-    for (Map.Entry<DatanodeDetails, SCMCommand<?>> dnCommand : entries) {
+    for (Map.Entry<DatanodeDetails, SCMCommand<?>> dnCommand : commands) {
       if (dnCommand.getValue() instanceof ReplicateContainerCommand) {
         replicateCommand++;
       } else if (dnCommand
@@ -666,7 +666,7 @@ public class TestECUnderReplicationHandler {
         replicateCommand);
     Assertions.assertEquals(shouldReconstructCommandExist ? 1 : 0,
         reconstructCommand);
-    return datanodeDetailsSCMCommandMap;
+    return commands;
   }
 
   private DeleteContainerCommand createDeleteContainerCommand(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index 2059670417..9afa9e1398 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -40,6 +41,7 @@ import org.junit.jupiter.api.Assertions;
 import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -158,14 +160,14 @@ public abstract class TestMisReplicationHandler {
     Map<DatanodeDetails, Integer> copyReplicaIdxMap = copy.stream()
             .collect(Collectors.toMap(ContainerReplica::getDatanodeDetails,
                     ContainerReplica::getReplicaIndex));
-    Map<DatanodeDetails, SCMCommand<?>> datanodeDetailsSCMCommandMap =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
             misReplicationHandler.processAndCreateCommands(availableReplicas,
                     pendingOp, result, maintenanceCnt);
-    Assertions.assertEquals(expectedNumberOfNodes,
-            datanodeDetailsSCMCommandMap.size());
-    Assertions.assertTrue(datanodeDetailsSCMCommandMap.keySet()
-            .containsAll(targetNodes));
-    for (SCMCommand<?> command : datanodeDetailsSCMCommandMap.values()) {
+    Assertions.assertEquals(expectedNumberOfNodes, commands.size());
+    Assertions.assertEquals(new HashSet<>(targetNodes),
+        commands.stream().map(Pair::getKey).collect(Collectors.toSet()));
+    for (Pair<DatanodeDetails, SCMCommand<?>> pair : commands) {
+      SCMCommand<?> command = pair.getValue();
       Assertions.assertTrue(command.getType() == replicateContainerCommand);
       ReplicateContainerCommand replicateContainerCommand =
               (ReplicateContainerCommand) command;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
index cbaa9a8afe..910ba75f98 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -32,8 +33,8 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.mockito.ArgumentMatchers.any;
 
@@ -67,11 +68,11 @@ public class TestOverReplicatedProcessor {
         .thenReturn(
             new ContainerHealthResult.OverReplicatedHealthResult(container, 3,
                 false), null);
-    Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
     DeleteContainerCommand cmd =
         new DeleteContainerCommand(container.getContainerID());
     cmd.setReplicaIndex(5);
-    commands.put(MockDatanodeDetails.randomDatanodeDetails(), cmd);
+    commands.add(Pair.of(MockDatanodeDetails.randomDatanodeDetails(), cmd));
 
     Mockito
         .when(replicationManager.processOverReplicatedContainer(any()))
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
index 5b0b762307..4e1825492e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
@@ -42,8 +43,8 @@ import org.slf4j.event.Level;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
@@ -195,10 +196,12 @@ public class TestRatisOverReplicationHandler {
             Mockito.argThat(list -> list.size() <= 4), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(1, 2, 3));
 
-    Map<DatanodeDetails, SCMCommand<?>> commands = testProcessing(replicas,
-        Collections.emptyList(), getOverReplicatedHealthResult(), 2);
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = testProcessing(
+        replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 2);
+    Set<DatanodeDetails> datanodes =
+        commands.stream().map(Pair::getKey).collect(Collectors.toSet());
     Assert.assertTrue(
-        commands.containsKey(quasiClosedReplica.getDatanodeDetails()));
+        datanodes.contains(quasiClosedReplica.getDatanodeDetails()));
   }
 
   @Test
@@ -217,12 +220,14 @@ public class TestRatisOverReplicationHandler {
     replicas.add(decommissioningReplica);
     replicas.add(maintenanceReplica);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands = testProcessing(replicas,
-        Collections.emptyList(), getOverReplicatedHealthResult(), 1);
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = testProcessing(
+        replicas, Collections.emptyList(), getOverReplicatedHealthResult(), 1);
+    Set<DatanodeDetails> datanodes =
+        commands.stream().map(Pair::getKey).collect(Collectors.toSet());
     Assert.assertFalse(
-        commands.containsKey(decommissioningReplica.getDatanodeDetails()));
+        datanodes.contains(decommissioningReplica.getDatanodeDetails()));
     Assert.assertFalse(
-        commands.containsKey(maintenanceReplica.getDatanodeDetails()));
+        datanodes.contains(maintenanceReplica.getDatanodeDetails()));
   }
 
   @Test
@@ -255,14 +260,14 @@ public class TestRatisOverReplicationHandler {
    *                          the handler
    * @return map of commands
    */
-  private Map<DatanodeDetails, SCMCommand<?>> testProcessing(
+  private Set<Pair<DatanodeDetails, SCMCommand<?>>> testProcessing(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
       ContainerHealthResult healthResult,
       int expectNumCommands) throws IOException {
     RatisOverReplicationHandler handler =
         new RatisOverReplicationHandler(policy, nodeManager);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
         handler.processAndCreateCommands(replicas, pendingOps,
             healthResult, 2);
     Assert.assertEquals(expectNumCommands, commands.size());
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index efed912e74..29e035f4de 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -44,7 +44,6 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -220,7 +219,7 @@ public class TestRatisUnderReplicationHandler {
         new RatisUnderReplicationHandler(policy, conf, nodeManager,
             replicationManager);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands =
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
         handler.processAndCreateCommands(replicas, pendingOps,
             healthResult, minHealthyForMaintenance);
     Assert.assertEquals(expectNumCommands, commands.size());
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
index a56030811d..48cc5476ab 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -34,9 +35,9 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 
 import static org.mockito.ArgumentMatchers.any;
 
@@ -84,10 +85,10 @@ public class TestUnderReplicatedProcessor {
     targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
     byte[] missingIndexes = {4, 5};
 
-    Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
-    commands.put(MockDatanodeDetails.randomDatanodeDetails(),
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+    commands.add(Pair.of(MockDatanodeDetails.randomDatanodeDetails(),
         new ReconstructECContainersCommand(container.getContainerID(),
-            sourceNodes, targetNodes, missingIndexes, repConfig));
+            sourceNodes, targetNodes, missingIndexes, repConfig)));
 
     Mockito.when(replicationManager
             .processUnderReplicatedContainer(any()))
@@ -115,8 +116,8 @@ public class TestUnderReplicatedProcessor {
         container.getContainerID(), sourceDns);
     rcc.setReplicaIndex(3);
 
-    Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
-    commands.put(targetDn, rcc);
+    Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
+    commands.add(Pair.of(targetDn, rcc));
 
     Mockito.when(replicationManager
             .processUnderReplicatedContainer(any()))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to