This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new da79b9a8b9 HDDS-8504. ReplicationManager: Pass used and excluded node 
separately for Under and Mis-Replication (#4694)
da79b9a8b9 is described below

commit da79b9a8b9703ecef32479329aa4163b9f0d35ae
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu May 11 16:40:07 2023 +0100

    HDDS-8504. ReplicationManager: Pass used and excluded node separately for 
Under and Mis-Replication (#4694)
---
 .../replication/ECUnderReplicationHandler.java     |  60 +++++-----
 .../replication/MisReplicationHandler.java         |  20 ++--
 .../replication/RatisUnderReplicationHandler.java  |  29 ++---
 .../replication/ReplicationManagerUtil.java        |  98 +++++++++++++++
 .../container/replication/ReplicationTestUtil.java |   3 +-
 .../replication/TestECUnderReplicationHandler.java |  29 +++--
 .../TestRatisUnderReplicationHandler.java          |  80 ++++++++++++-
 .../replication/TestReplicationManagerUtil.java    | 133 +++++++++++++++++++++
 8 files changed, 383 insertions(+), 69 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index e54102b2e6..e6d998a4a1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -124,19 +124,15 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       return 0;
     }
 
-    // don't place reconstructed replicas on exclude nodes, since they already
-    // have replicas
-    List<DatanodeDetails> excludedNodes = replicas.stream()
-        .map(ContainerReplica::getDatanodeDetails)
-        .collect(Collectors.toList());
-    // DNs that are already waiting to receive replicas cannot be targets
-    excludedNodes.addAll(
-        pendingOps.stream()
-        .filter(containerReplicaOp -> containerReplicaOp.getOpType() ==
-            ContainerReplicaOp.PendingOpType.ADD)
-        .map(ContainerReplicaOp::getTarget)
-        .collect(Collectors.toList()));
+    ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+        ReplicationManagerUtil.getExcludedAndUsedNodes(
+            new ArrayList<>(replicas), Collections.emptySet(), pendingOps,
+            replicationManager);
+    List<DatanodeDetails> excludedNodes
+        = excludedAndUsedNodes.getExcludedNodes();
     excludedNodes.addAll(replicationManager.getExcludedNodes());
+    List<DatanodeDetails> usedNodes
+        = excludedAndUsedNodes.getUsedNodes();
 
     final ContainerID id = container.containerID();
     int commandsSent = 0;
@@ -162,14 +158,14 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         IOException firstException = null;
         try {
           commandsSent += processMissingIndexes(replicaCount, sources,
-              availableSourceNodes, excludedNodes);
+              availableSourceNodes, excludedNodes, usedNodes);
         } catch (InsufficientDatanodesException
             | CommandTargetOverloadedException  e) {
           firstException = e;
         }
         try {
           commandsSent += processDecommissioningIndexes(replicaCount, sources,
-              availableSourceNodes, excludedNodes);
+              availableSourceNodes, excludedNodes, usedNodes);
         } catch (InsufficientDatanodesException
             | CommandTargetOverloadedException e) {
           if (firstException == null) {
@@ -178,7 +174,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         }
         try {
           commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
-              excludedNodes);
+              excludedNodes, usedNodes);
         } catch (InsufficientDatanodesException
             | CommandTargetOverloadedException e) {
           if (firstException == null) {
@@ -272,7 +268,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       ECContainerReplicaCount replicaCount, Map<Integer,
       Pair<ContainerReplica, NodeStatus>> sources,
       List<DatanodeDetails> availableSourceNodes,
-      List<DatanodeDetails> excludedNodes) throws IOException {
+      List<DatanodeDetails> excludedNodes,
+      List<DatanodeDetails> usedNodes) throws IOException {
     ContainerInfo container = replicaCount.getContainer();
     ECReplicationConfig repConfig =
         (ECReplicationConfig)container.getReplicationConfig();
@@ -286,7 +283,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       int expectedTargets = missingIndexes.size();
       final List<DatanodeDetails> selectedDatanodes =
           ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
-              expectedTargets, null, excludedNodes, currentContainerSize,
+              expectedTargets, usedNodes, excludedNodes, currentContainerSize,
               container);
 
       // If we got less targets than missing indexes, we need to prune the
@@ -297,7 +294,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
             missingIndexes.size()).clear();
       }
       if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
-        excludedNodes.addAll(selectedDatanodes);
+        usedNodes.addAll(selectedDatanodes);
         // TODO - what are we adding all the selected nodes to available
         //        sources?
         availableSourceNodes.addAll(selectedDatanodes);
@@ -357,18 +354,19 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       ECContainerReplicaCount replicaCount,
       Map<Integer, Pair<ContainerReplica, NodeStatus>> sources,
       List<DatanodeDetails> availableSourceNodes,
-      List<DatanodeDetails> excludedNodes) throws IOException {
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> usedNodes)
+      throws IOException {
     ContainerInfo container = replicaCount.getContainer();
     Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
     int commandsSent = 0;
     if (decomIndexes.size() > 0) {
       final List<DatanodeDetails> selectedDatanodes =
           ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
-              decomIndexes.size(), null, excludedNodes, currentContainerSize,
-              container);
+              decomIndexes.size(), usedNodes, excludedNodes,
+              currentContainerSize, container);
 
       if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
-        excludedNodes.addAll(selectedDatanodes);
+        usedNodes.addAll(selectedDatanodes);
         Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
         // In this case we need to do one to one copy.
         CommandTargetOverloadedException overloadedException = null;
@@ -382,11 +380,11 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
           ContainerReplica sourceReplica = source.getLeft();
           if (!iterator.hasNext()) {
             LOG.warn("Couldn't find enough targets. Available source"
-                + " nodes: {}, the target nodes: {}, excluded nodes: {}"
-                + "  and the decommission indexes: {}",
+                + " nodes: {}, the target nodes: {}, excluded nodes: {},"
+                + " usedNodes: {}, and the decommission indexes: {}",
                 sources.values().stream()
                     .map(Pair::getLeft).collect(Collectors.toSet()),
-                selectedDatanodes, excludedNodes, decomIndexes);
+                selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
             break;
           }
           try {
@@ -430,7 +428,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   private int processMaintenanceOnlyIndexes(
       ECContainerReplicaCount replicaCount,
       Map<Integer, Pair<ContainerReplica, NodeStatus>> sources,
-      List<DatanodeDetails> excludedNodes) throws IOException {
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> usedNodes)
+      throws IOException {
     Set<Integer> maintIndexes = replicaCount.maintenanceOnlyIndexes(true);
     if (maintIndexes.isEmpty()) {
       return 0;
@@ -444,9 +443,9 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       return 0;
     }
     List<DatanodeDetails> targets = ReplicationManagerUtil.getTargetDatanodes(
-        containerPlacement, maintIndexes.size(), null, excludedNodes,
+        containerPlacement, maintIndexes.size(), usedNodes, excludedNodes,
         currentContainerSize, container);
-    excludedNodes.addAll(targets);
+    usedNodes.addAll(targets);
 
     Iterator<DatanodeDetails> iterator = targets.iterator();
     int commandsSent = 0;
@@ -466,11 +465,12 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       ContainerReplica sourceReplica = source.getLeft();
       if (!iterator.hasNext()) {
         LOG.warn("Couldn't find enough targets. Available source"
-                + " nodes: {}, target nodes: {}, excluded nodes: {} and"
+                + " nodes: {}, target nodes: {}, excluded nodes: {},"
+                + " usedNodes: {} and"
                 + " maintenance indexes: {}",
             sources.values().stream()
                 .map(Pair::getLeft).collect(Collectors.toSet()),
-            targets, excludedNodes, maintIndexes);
+            targets, excludedNodes, usedNodes, maintIndexes);
         break;
       }
       try {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index b5590f2985..d41f80d604 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -137,22 +138,23 @@ public abstract class MisReplicationHandler implements
               container.getContainerID());
       return 0;
     }
-
     Set<ContainerReplica> sources = filterSources(replicas);
     Set<ContainerReplica> replicasToBeReplicated = containerPlacement
             .replicasToCopyToFixMisreplication(replicas.stream()
             .collect(Collectors.toMap(Function.identity(), 
sources::contains)));
-    usedDns = replicas.stream().filter(r -> 
!replicasToBeReplicated.contains(r))
-            .map(ContainerReplica::getDatanodeDetails)
-            .collect(Collectors.toList());
-    List<DatanodeDetails> excludedDns = replicasToBeReplicated.stream()
-            .map(ContainerReplica::getDatanodeDetails)
-            .collect(Collectors.toList());
+
+    ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes
+        = ReplicationManagerUtil.getExcludedAndUsedNodes(
+            new ArrayList(replicas), replicasToBeReplicated,
+            Collections.emptyList(), replicationManager);
+
     int requiredNodes = replicasToBeReplicated.size();
 
     List<DatanodeDetails> targetDatanodes = ReplicationManagerUtil
-        .getTargetDatanodes(containerPlacement, requiredNodes, usedDns,
-            excludedDns, currentContainerSize, container);
+        .getTargetDatanodes(containerPlacement, requiredNodes,
+            excludedAndUsedNodes.getUsedNodes(),
+            excludedAndUsedNodes.getExcludedNodes(), currentContainerSize,
+            container);
     List<DatanodeDetails> availableSources = sources.stream()
         .map(ContainerReplica::getDatanodeDetails)
         .collect(Collectors.toList());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 3ae30bea78..ae14c20f47 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.OptionalLong;
@@ -264,26 +265,20 @@ public class RatisUnderReplicationHandler
     LOG.debug("Need {} target datanodes for container {}. Current " +
             "replicas: {}.", replicaCount.additionalReplicaNeeded(),
         replicaCount.getContainer().containerID(), replicaCount.getReplicas());
-    // DNs that already have replicas cannot be targets and should be excluded
-    final List<DatanodeDetails> excludeList =
-        replicaCount.getReplicas().stream()
-            .map(ContainerReplica::getDatanodeDetails)
-            .collect(Collectors.toList());
 
-    // DNs that are already waiting to receive replicas cannot be targets
-    final List<DatanodeDetails> pendingReplication =
-        pendingOps.stream()
-            .filter(containerReplicaOp -> containerReplicaOp.getOpType() ==
-                ContainerReplicaOp.PendingOpType.ADD)
-            .map(ContainerReplicaOp::getTarget)
-            .collect(Collectors.toList());
-    LOG.debug("Excluding DNs. excludeList: {}, size: {}. pendingReplication: " 
+
-            "{}, size: {}.", excludeList, excludeList.size(),
-        pendingReplication, pendingReplication.size());
-    excludeList.addAll(pendingReplication);
+    ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+        ReplicationManagerUtil.getExcludedAndUsedNodes(
+            replicaCount.getReplicas(), Collections.emptySet(), pendingOps,
+            replicationManager);
+
+    List<DatanodeDetails> excluded = excludedAndUsedNodes.getExcludedNodes();
+    List<DatanodeDetails> used = excludedAndUsedNodes.getUsedNodes();
+
+    LOG.debug("UsedList: {}, size {}. ExcludeList: {}, size: {}. ",
+        used, used.size(), excluded, excluded.size());
 
     return ReplicationManagerUtil.getTargetDatanodes(placementPolicy,
-        replicaCount.additionalReplicaNeeded(), null, excludeList,
+        replicaCount.additionalReplicaNeeded(), used, excluded,
         currentContainerSize, replicaCount.getContainer());
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index 12b1eecf61..6757b059da 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -18,14 +18,20 @@
 package org.apache.hadoop.hdds.scm.container.replication;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Utility class for ReplicationManager.
@@ -90,4 +96,96 @@ public final class ReplicationManagerUtil {
         SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
   }
 
+  /**
+   * Given a list of replicas and a set of nodes to be removed, returns an
+   * object container two lists. One is a list of nodes that should be excluded
+   * from being selected as targets for new replicas. The other is a list of
+   * nodes that are currently used by the container and the placement policy
+   * can consider for rack placement
+   * @param replicas List of existing replicas
+   * @param toBeRemoved Set of nodes containing replicas that are to be removed
+   * @param pendingReplicaOps List of pending replica operations
+   * @param replicationManager ReplicationManager instance to get NodeStatus
+   * @return ExcludedAndUsedNodes object containing the excluded and used lists
+   */
+  public static ExcludedAndUsedNodes getExcludedAndUsedNodes(
+      List<ContainerReplica> replicas,
+      Set<ContainerReplica> toBeRemoved,
+      List<ContainerReplicaOp> pendingReplicaOps,
+      ReplicationManager replicationManager) {
+    List<DatanodeDetails> excludedNodes = new ArrayList<>();
+    List<DatanodeDetails> usedNodes = new ArrayList<>();
+
+    for (ContainerReplica r : replicas) {
+      if (r.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+        // Hosts with an Unhealthy replica cannot receive a new replica, but
+        // they are not considered used as they will be removed later.
+        excludedNodes.add(r.getDatanodeDetails());
+        continue;
+      }
+      if (toBeRemoved.contains(r)) {
+        // This node is currently present, but we plan to remove it so it is 
not
+        // considered used, but must be excluded
+        excludedNodes.add(r.getDatanodeDetails());
+        continue;
+      }
+      try {
+        NodeStatus nodeStatus =
+            replicationManager.getNodeStatus(r.getDatanodeDetails());
+        if (nodeStatus.isDecommission()) {
+          // Decommission nodes are going to go away and their replicas need to
+          // be replaced. Therefore we mark them excluded.
+          // Maintenance nodes should return to the cluster, so they would 
still
+          // be considered used (handled in the catch all at the end of the 
loop
+          // ).
+          excludedNodes.add(r.getDatanodeDetails());
+          continue;
+        }
+      } catch (NodeNotFoundException e) {
+        LOG.warn("Node {} not found in node manager.", r.getDatanodeDetails());
+        // This should not happen, but if it does, just add the node to the
+        // exclude list
+        excludedNodes.add(r.getDatanodeDetails());
+        continue;
+      }
+      // If we get here, this is a used node
+      usedNodes.add(r.getDatanodeDetails());
+    }
+    for (ContainerReplicaOp pending : pendingReplicaOps) {
+      if (pending.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        // If we are adding a replicas, then its scheduled to become a used 
node
+        usedNodes.add(pending.getTarget());
+      }
+      if (pending.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        // If there are any ops pending delete, we cannot use the node, but 
they
+        // are not considered used as they will be removed later.
+        excludedNodes.add(pending.getTarget());
+      }
+    }
+    return new ExcludedAndUsedNodes(excludedNodes, usedNodes);
+  }
+
+
+  /**
+   * Simple class to hold the excluded and used nodes lists.
+   */
+  public static class ExcludedAndUsedNodes {
+    private List<DatanodeDetails> excludedNodes;
+    private List<DatanodeDetails> usedNodes;
+
+    public ExcludedAndUsedNodes(List<DatanodeDetails> excludedNodes,
+        List<DatanodeDetails> usedNodes) {
+      this.excludedNodes = excludedNodes;
+      this.usedNodes = usedNodes;
+    }
+
+    public List<DatanodeDetails> getExcludedNodes() {
+      return excludedNodes;
+    }
+
+    public List<DatanodeDetails> getUsedNodes() {
+      return usedNodes;
+    }
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 9d520d69ac..5da2c8e54b 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -284,7 +284,8 @@ public final class ReplicationTestUtil {
         if (nodesRequiredToChoose > 1) {
           throw new IllegalArgumentException("Only one node is allowed");
         }
-        if (excludedNodes.contains(nodeToReturn)) {
+        if (excludedNodes.contains(nodeToReturn)
+            || usedNodes.contains(nodeToReturn)) {
           throw new SCMException("Insufficient Nodes available to choose",
               SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
         }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index af486d257d..bd91bfce85 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -61,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.singleton;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -165,10 +166,14 @@ public class TestECUnderReplicationHandler {
 
     ArgumentCaptor<List<DatanodeDetails>> captor =
         ArgumentCaptor.forClass(List.class);
-    Mockito.when(ecPlacementPolicy.chooseDatanodes(captor.capture(),
+    // The used list is modified after it is passed to the placement policy,
+    // so a plain Captor won't work.
+    AtomicReference<List<DatanodeDetails>> usedList = new AtomicReference<>();
+    Mockito.when(ecPlacementPolicy.chooseDatanodes(any(), captor.capture(),
             any(), anyInt(), anyLong(), anyLong())
         ).thenAnswer(invocationOnMock -> {
-          int numNodes = invocationOnMock.getArgument(2);
+          usedList.set(new ArrayList<>(invocationOnMock.getArgument(0)));
+          int numNodes = invocationOnMock.getArgument(3);
           List<DatanodeDetails> targets = new ArrayList<>();
           for (int i = 0; i < numNodes; i++) {
             targets.add(MockDatanodeDetails.randomDatanodeDetails());
@@ -180,6 +185,11 @@ public class TestECUnderReplicationHandler {
         replicas, Collections.emptyList(), result, 2);
 
     Assertions.assertTrue(captor.getValue().contains(excludedByRM));
+    Assertions.assertEquals(3, usedList.get().size());
+    for (ContainerReplica r : replicas) {
+      Assertions.assertTrue(
+          usedList.get().contains(r.getDatanodeDetails()));
+    }
   }
 
   @Test
@@ -793,10 +803,10 @@ public class TestECUnderReplicationHandler {
             Pair.of(IN_MAINTENANCE, 3), Pair.of(IN_SERVICE, 4),
             Pair.of(IN_SERVICE, 5));
 
-    Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), Mockito.isNull(),
-            anyInt(), anyLong(), anyLong()))
+    Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+            Mockito.isNull(), anyInt(), anyLong(), anyLong()))
         .thenAnswer(invocationOnMock -> {
-          int numNodes = invocationOnMock.getArgument(2);
+          int numNodes = invocationOnMock.getArgument(3);
           List<DatanodeDetails> targets = new ArrayList<>();
           for (int i = 0; i < numNodes; i++) {
             targets.add(MockDatanodeDetails.randomDatanodeDetails());
@@ -839,12 +849,13 @@ public class TestECUnderReplicationHandler {
     containing that DN. Ensures the test will fail if excludeNodes does not
     contain the DN pending ADD.
      */
-    Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), Mockito.isNull(),
-            anyInt(), anyLong(), anyLong()))
+    Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+            Mockito.isNull(), anyInt(), anyLong(), anyLong()))
         .thenAnswer(invocationOnMock -> {
-          List<DatanodeDetails> excludeList = invocationOnMock.getArgument(0);
+          List<DatanodeDetails> usedList = invocationOnMock.getArgument(0);
+          List<DatanodeDetails> excludeList = invocationOnMock.getArgument(1);
           List<DatanodeDetails> targets = new ArrayList<>(1);
-          if (excludeList.contains(dn)) {
+          if (usedList.contains(dn) || excludeList.contains(dn)) {
             targets.add(MockDatanodeDetails.randomDatanodeDetails());
           } else {
             targets.add(dn);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index ae0be5e63a..fe0caf8810 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -41,9 +41,12 @@ import 
org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -52,10 +55,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
 
 /**
  * Tests for {@link RatisUnderReplicationHandler}.
@@ -92,7 +99,7 @@ public class TestRatisUnderReplicationHandler {
       DatanodeDetails, and NodeState as HEALTHY.
     */
     Mockito.when(
-        replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+        replicationManager.getNodeStatus(any(DatanodeDetails.class)))
         .thenAnswer(invocationOnMock -> {
           DatanodeDetails dn = invocationOnMock.getArgument(0);
           return new NodeStatus(dn.getPersistedOpState(),
@@ -301,11 +308,78 @@ public class TestRatisUnderReplicationHandler {
 
     // Ensure that the replica with SEQ=2 is the only source sent
     Mockito.verify(replicationManager).sendThrottledReplicationCommand(
-        Mockito.any(ContainerInfo.class),
+        any(ContainerInfo.class),
         Mockito.eq(Collections.singletonList(valid.getDatanodeDetails())),
-        Mockito.any(DatanodeDetails.class), anyInt());
+        any(DatanodeDetails.class), anyInt());
   }
 
+  @Test
+  public void testCorrectUsedAndExcludedNodesPassed() throws IOException {
+    PlacementPolicy mockPolicy = Mockito.mock(PlacementPolicy.class);
+    Mockito.when(mockPolicy.chooseDatanodes(any(), any(), any(),
+        anyInt(), anyLong(), anyLong()))
+        .thenReturn(Collections.singletonList(
+            MockDatanodeDetails.randomDatanodeDetails()));
+
+    ArgumentCaptor<List<DatanodeDetails>> usedNodesCaptor =
+        ArgumentCaptor.forClass(List.class);
+
+    ArgumentCaptor<List<DatanodeDetails>> excludedNodesCaptor =
+        ArgumentCaptor.forClass(List.class);
+
+    RatisUnderReplicationHandler handler =
+        new RatisUnderReplicationHandler(mockPolicy, conf, replicationManager);
+
+    Set<ContainerReplica> replicas = new HashSet<>();
+    ContainerReplica good = createContainerReplica(container.containerID(), 0,
+        IN_SERVICE, State.CLOSED, 1);
+    replicas.add(good);
+
+    ContainerReplica unhealthy = createContainerReplica(
+        container.containerID(), 0, IN_SERVICE, State.UNHEALTHY, 1);
+    replicas.add(unhealthy);
+
+    ContainerReplica decommissioning =
+        createContainerReplica(container.containerID(), 0,
+            DECOMMISSIONING, State.CLOSED, 1);
+    replicas.add(decommissioning);
+
+    ContainerReplica maintenance =
+        createContainerReplica(container.containerID(), 0,
+            IN_MAINTENANCE, State.CLOSED, 1);
+    replicas.add(maintenance);
+
+    List<ContainerReplicaOp> pendingOps = new ArrayList<>();
+    DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails pendingRemove = 
MockDatanodeDetails.randomDatanodeDetails();
+    pendingOps.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+    pendingOps.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.DELETE, pendingRemove, 0));
+
+    handler.processAndSendCommands(replicas, pendingOps,
+        getUnderReplicatedHealthResult(), 2);
+
+
+    Mockito.verify(mockPolicy, times(1)).chooseDatanodes(
+        usedNodesCaptor.capture(), excludedNodesCaptor.capture(), any(),
+        anyInt(), anyLong(), anyLong());
+
+    List<DatanodeDetails> usedNodes = usedNodesCaptor.getValue();
+    List<DatanodeDetails> excludedNodes = excludedNodesCaptor.getValue();
+
+    Assertions.assertTrue(usedNodes.contains(good.getDatanodeDetails()));
+    
Assertions.assertTrue(usedNodes.contains(maintenance.getDatanodeDetails()));
+    Assertions.assertTrue(usedNodes.contains(pendingAdd));
+
+    Assertions.assertTrue(excludedNodes.contains(
+        unhealthy.getDatanodeDetails()));
+    Assertions.assertTrue(excludedNodes.contains(
+        decommissioning.getDatanodeDetails()));
+    Assertions.assertTrue(excludedNodes.contains(pendingRemove));
+  }
+
+
   /**
    * Tests whether the specified expectNumCommands number of commands are
    * created by the handler.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
new file mode 100644
index 0000000000..1ffbfec39a
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
+
+/**
+ * Tests for ReplicationManagerUtil.
+ */
+public class TestReplicationManagerUtil {
+
+  private ReplicationManager replicationManager;
+
+  @Before
+  public void setup() {
+    replicationManager = Mockito.mock(ReplicationManager.class);
+  }
+
+  @Test
+  public void testGetExcludedAndUsedNodes() throws NodeNotFoundException {
+    ContainerID cid = ContainerID.valueOf(1L);
+    Set<ContainerReplica> replicas = new HashSet<>();
+    ContainerReplica good = createContainerReplica(cid, 0,
+        IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(good);
+
+    ContainerReplica remove = createContainerReplica(cid, 0,
+        IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(remove);
+
+    ContainerReplica unhealthy = createContainerReplica(
+        cid, 0, IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, 1);
+    replicas.add(unhealthy);
+
+    ContainerReplica decommissioning =
+        createContainerReplica(cid, 0,
+            DECOMMISSIONING, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(decommissioning);
+
+    ContainerReplica maintenance =
+        createContainerReplica(cid, 0,
+            IN_MAINTENANCE, ContainerReplicaProto.State.CLOSED, 1);
+    replicas.add(maintenance);
+
+    // Take one of the replicas and set it to be removed. It should be on the
+    // excluded list rather than the used list.
+    Set<ContainerReplica> toBeRemoved = new HashSet<>();
+    toBeRemoved.add(remove);
+
+    // Finally, add a pending add and delete. The add should go onto the used
+    // list and the delete added to the excluded nodes.
+    DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails pendingDelete = 
MockDatanodeDetails.randomDatanodeDetails();
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+    pending.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.DELETE, pendingDelete, 0));
+
+    Mockito.when(replicationManager.getNodeStatus(Mockito.any())).thenAnswer(
+        invocation -> {
+          final DatanodeDetails dn = invocation.getArgument(0);
+          for (ContainerReplica r : replicas) {
+            if (r.getDatanodeDetails().equals(dn)) {
+              return new NodeStatus(
+                  r.getDatanodeDetails().getPersistedOpState(),
+                  HddsProtos.NodeState.HEALTHY);
+            }
+          }
+          throw new NodeNotFoundException(dn.getUuidString());
+        });
+
+    ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+        ReplicationManagerUtil.getExcludedAndUsedNodes(
+            new ArrayList<>(replicas), toBeRemoved, pending,
+            replicationManager);
+
+    Assertions.assertEquals(3, excludedAndUsedNodes.getUsedNodes().size());
+    Assertions.assertTrue(excludedAndUsedNodes.getUsedNodes()
+        .contains(good.getDatanodeDetails()));
+    Assertions.assertTrue(excludedAndUsedNodes.getUsedNodes()
+        .contains(maintenance.getDatanodeDetails()));
+    Assertions.assertTrue(excludedAndUsedNodes.getUsedNodes()
+        .contains(pendingAdd));
+
+    Assertions.assertEquals(4, excludedAndUsedNodes.getExcludedNodes().size());
+    Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(unhealthy.getDatanodeDetails()));
+    Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(decommissioning.getDatanodeDetails()));
+    Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(remove.getDatanodeDetails()));
+    Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(pendingDelete));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to