This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e360de0337 HDDS-8727. Defer non-critical partial EC reconstruction 
(#4810)
e360de0337 is described below

commit e360de03375e5204857626b6949e7a2f61721d29
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Jun 1 20:48:48 2023 +0200

    HDDS-8727. Defer non-critical partial EC reconstruction (#4810)
---
 .../replication/ECUnderReplicationHandler.java     | 103 ++--
 .../pipeline/InsufficientDatanodesException.java   |  23 +-
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |  12 +-
 .../replication/TestECUnderReplicationHandler.java | 556 +++++++++++++--------
 4 files changed, 444 insertions(+), 250 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 8b47e11fa8..fa189acba1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -64,7 +65,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   private final long currentContainerSize;
   private final ReplicationManager replicationManager;
 
-  public ECUnderReplicationHandler(final PlacementPolicy containerPlacement,
+  ECUnderReplicationHandler(final PlacementPolicy containerPlacement,
       final ConfigurationSource conf, ReplicationManager replicationManager) {
     this.containerPlacement = containerPlacement;
     this.currentContainerSize = (long) conf
@@ -131,7 +132,6 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
             replicationManager);
     List<DatanodeDetails> excludedNodes
         = excludedAndUsedNodes.getExcludedNodes();
-    excludedNodes.addAll(replicationManager.getExcludedNodes());
     List<DatanodeDetails> usedNodes
         = excludedAndUsedNodes.getUsedNodes();
 
@@ -164,6 +164,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
             | CommandTargetOverloadedException  e) {
           firstException = e;
         }
+
+        excludedNodes.addAll(replicationManager.getExcludedNodes());
         try {
           commandsSent += processDecommissioningIndexes(replicaCount, sources,
               availableSourceNodes, excludedNodes, usedNodes);
@@ -270,10 +272,9 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   }
 
   /**
-   * Processes replicas that are in maintenance nodes and should need
+   * Processes replicas that are on in-service nodes and should need
    * additional copies.
    * @return number of commands sent
-   * @throws IOException
    */
   private int processMissingIndexes(
       ECContainerReplicaCount replicaCount, Map<Integer,
@@ -285,26 +286,59 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     ECReplicationConfig repConfig =
         (ECReplicationConfig)container.getReplicationConfig();
     List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
-    if (missingIndexes.size() == 0) {
+    final int expectedTargetCount = missingIndexes.size();
+    if (expectedTargetCount == 0) {
       return 0;
     }
 
     int commandsSent = 0;
     if (sources.size() >= repConfig.getData()) {
-      int expectedTargets = missingIndexes.size();
-      final List<DatanodeDetails> selectedDatanodes =
-          ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
-              expectedTargets, usedNodes, excludedNodes, currentContainerSize,
-              container);
+      Set<DatanodeDetails> excludedDueToLoad =
+          replicationManager.getExcludedNodes();
+      final boolean hasOverloaded = !excludedDueToLoad.isEmpty();
+      final List<DatanodeDetails> excludedOrOverloadedNodes = hasOverloaded
+          ? ImmutableList.copyOf(ImmutableSet.<DatanodeDetails>builder()
+              .addAll(excludedNodes)
+              .addAll(excludedDueToLoad)
+              .build())
+          : excludedNodes;
+
+      // placement with overloaded nodes excluded
+      final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
+          container, expectedTargetCount, usedNodes, excludedOrOverloadedNodes
+      );
+      final int targetCount = selectedDatanodes.size();
+
+      if (hasOverloaded &&
+          // selection allows partial recovery
+          0 < targetCount && targetCount < expectedTargetCount &&
+          // recovery is not yet critical
+          expectedTargetCount < repConfig.getParity()) {
+
+        // check if placement exists when overloaded nodes are not excluded
+        final List<DatanodeDetails> targetsMaybeOverloaded = 
getTargetDatanodes(
+            container, expectedTargetCount, usedNodes, excludedNodes);
+
+        if (targetsMaybeOverloaded.size() == expectedTargetCount) {
+          final int overloadedCount = expectedTargetCount - targetCount;
+          LOG.info("Deferring reconstruction of container {}, which requires 
{}"
+                  + " target nodes to be fully reconstructed, but {} selected"
+                  + " nodes are currently overloaded.",
+              container.getContainerID(), expectedTargetCount, 
overloadedCount);
+
+          throw new InsufficientDatanodesException(expectedTargetCount,
+              targetCount);
+        }
+      }
 
       // If we got less targets than missing indexes, we need to prune the
-      // missing index list so it only tries to recover the nummber of indexes
+      // missing index list so it only tries to recover the number of indexes
       // we have targets for.
-      if (selectedDatanodes.size() < expectedTargets) {
-        missingIndexes.subList(selectedDatanodes.size(),
-            missingIndexes.size()).clear();
+      if (targetCount < expectedTargetCount) {
+        missingIndexes.subList(targetCount, expectedTargetCount).clear();
       }
-      if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
+      if (0 < targetCount &&
+          validatePlacement(availableSourceNodes, selectedDatanodes)) {
         usedNodes.addAll(selectedDatanodes);
         // TODO - what are we adding all the selected nodes to available
         //        sources?
@@ -337,13 +371,12 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         }
         commandsSent++;
       }
-      if (selectedDatanodes.size() != expectedTargets) {
+      if (targetCount != expectedTargetCount) {
         LOG.debug("Insufficient nodes were returned from the placement policy" 
+
             " to fully reconstruct container {}. Requested {} received {}",
-            container.getContainerID(), expectedTargets,
-            selectedDatanodes.size());
-        throw new InsufficientDatanodesException(expectedTargets,
-            selectedDatanodes.size());
+            container.getContainerID(), expectedTargetCount, targetCount);
+        throw new InsufficientDatanodesException(expectedTargetCount,
+            targetCount);
       }
     } else {
       LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
@@ -355,11 +388,21 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     return commandsSent;
   }
 
+  private List<DatanodeDetails> getTargetDatanodes(
+      ContainerInfo container, int requiredNodes,
+      List<DatanodeDetails> usedNodes,
+      List<DatanodeDetails> excludedNodes
+  ) throws SCMException {
+    return ReplicationManagerUtil.getTargetDatanodes(
+        containerPlacement, requiredNodes,
+        usedNodes, excludedNodes,
+        currentContainerSize, container);
+  }
+
   /**
-   * Processes replicas that are in maintenance nodes and should need
+   * Processes replicas that are in decommissioning nodes and should need
    * additional copies.
    * @return number of commands sent
-   * @throws IOException
    */
   private int processDecommissioningIndexes(
       ECContainerReplicaCount replicaCount,
@@ -371,10 +414,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
     int commandsSent = 0;
     if (decomIndexes.size() > 0) {
-      final List<DatanodeDetails> selectedDatanodes =
-          ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
-              decomIndexes.size(), usedNodes, excludedNodes,
-              currentContainerSize, container);
+      final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
+          container, decomIndexes.size(), usedNodes, excludedNodes);
 
       if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
         usedNodes.addAll(selectedDatanodes);
@@ -429,12 +470,10 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   /**
    * Processes replicas that are in maintenance nodes and should need
    * additional copies.
-   * @param replicaCount
    * @param sources Map of Replica Index to a pair of ContainerReplica and
    *                NodeStatus. This is the list of available replicas.
    * @param excludedNodes nodes that should not be targets for new copies
-   * @@return number of commands sent
-   * @throws IOException
+   * @return number of commands sent
    */
   private int processMaintenanceOnlyIndexes(
       ECContainerReplicaCount replicaCount,
@@ -453,9 +492,9 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     if (additionalMaintenanceCopiesNeeded == 0) {
       return 0;
     }
-    List<DatanodeDetails> targets = ReplicationManagerUtil.getTargetDatanodes(
-        containerPlacement, maintIndexes.size(), usedNodes, excludedNodes,
-        currentContainerSize, container);
+    List<DatanodeDetails> targets = getTargetDatanodes(
+        container, maintIndexes.size(), usedNodes, excludedNodes
+    );
     usedNodes.addAll(targets);
 
     Iterator<DatanodeDetails> iterator = targets.iterator();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
index 53f76320eb..c7ab1adcd0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InsufficientDatanodesException.java
@@ -23,21 +23,30 @@ import java.io.IOException;
 /**
  * Exception thrown when there are not enough Datanodes to create a pipeline.
  */
-public class InsufficientDatanodesException extends IOException {
+public final class InsufficientDatanodesException extends IOException {
 
+  private final int required;
+  private final int available;
 
-  public InsufficientDatanodesException() {
-    super();
-  }
-
-  public InsufficientDatanodesException(String message) {
+  public InsufficientDatanodesException(int required, int available,
+      String message) {
     super(message);
+    this.required = required;
+    this.available = available;
   }
 
   public InsufficientDatanodesException(int required, int available) {
-    super("Not enough datanodes" +
+    this(required, available, "Not enough datanodes" +
         ", requested: " + required +
         ", found: " + available
     );
   }
+
+  public int getRequiredNodes() {
+    return required;
+  }
+
+  public int getAvailableNodes() {
+    return available;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index a61ed2719a..ac390ea366 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -53,11 +53,13 @@ public class SimplePipelineProvider
       List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
       throws IOException {
     List<DatanodeDetails> dns = pickNodesNotUsed(replicationConfig);
-    if (dns.size() < replicationConfig.getRequiredNodes()) {
-      String e = String
-          .format("Cannot create pipeline of factor %d using %d nodes.",
-              replicationConfig.getRequiredNodes(), dns.size());
-      throw new InsufficientDatanodesException(e);
+    int available = dns.size();
+    int required = replicationConfig.getRequiredNodes();
+    if (available < required) {
+      String msg = String.format(
+          "Cannot create pipeline of factor %d using %d nodes.",
+          required, available);
+      throw new InsufficientDatanodesException(required, available, msg);
     }
 
     Collections.shuffle(dns);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 4ee5406b5a..6fc6b1c0eb 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
@@ -46,43 +47,60 @@ import 
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.assertj.core.util.Lists;
-import org.junit.Assert;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests the ECUnderReplicationHandling functionality.
@@ -99,9 +117,9 @@ public class TestECUnderReplicationHandler {
   private PlacementPolicy ecPlacementPolicy;
   private int remainingMaintenanceRedundancy = 1;
   private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
-  private AtomicBoolean throwOverloadedExceptionOnReplication
+  private final AtomicBoolean throwOverloadedExceptionOnReplication
       = new AtomicBoolean(false);
-  private AtomicBoolean throwOverloadedExceptionOnReconstruction
+  private final AtomicBoolean throwOverloadedExceptionOnReconstruction
       = new AtomicBoolean(false);
 
   @BeforeEach
@@ -114,13 +132,13 @@ public class TestECUnderReplicationHandler {
             dd.getPersistedOpState(), HddsProtos.NodeState.HEALTHY, 0);
       }
     };
-    replicationManager = Mockito.mock(ReplicationManager.class);
+    replicationManager = mock(ReplicationManager.class);
     ReplicationManager.ReplicationManagerConfiguration rmConf =
         new ReplicationManager.ReplicationManagerConfiguration();
-    Mockito.when(replicationManager.getConfig())
+    when(replicationManager.getConfig())
         .thenReturn(rmConf);
 
-    Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
+    when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
         .thenAnswer(invocation -> {
           DatanodeDetails dd = invocation.getArgument(0);
           return new NodeStatus(dd.getPersistedOpState(),
@@ -139,67 +157,155 @@ public class TestECUnderReplicationHandler {
 
     conf = SCMTestUtils.getConf();
     repConfig = new ECReplicationConfig(DATA, PARITY);
-    container = ReplicationTestUtil
-        .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+    container = createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
     policy = ReplicationTestUtil
             .getSimpleTestPlacementPolicy(nodeManager, conf);
     NodeSchema[] schemas =
         new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
     NodeSchemaManager.getInstance().init(schemas, true);
-    ecPlacementPolicy = Mockito.mock(PlacementPolicy.class);
-    Mockito.when(ecPlacementPolicy.validateContainerPlacement(
-        anyList(), anyInt()))
+    ecPlacementPolicy = mock(PlacementPolicy.class);
+    when(ecPlacementPolicy.validateContainerPlacement(anyList(), anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"rs-6-3-1024k", "rs-10-4-1024k"})
+  void defersNonCriticalPartialReconstruction(String rep) throws IOException {
+    final ECReplicationConfig ec = new ECReplicationConfig(rep);
+    final int data = ec.getData();
+    final int parity = ec.getParity();
+    ECUnderReplicationHandler subject = new ECUnderReplicationHandler(
+        ecPlacementPolicy, conf, replicationManager);
+    UnderReplicatedHealthResult result = mockUnderReplicated(ec);
+
+    Set<DatanodeDetails> excluded = excludeInReplicationManager(1);
+
+    final int remainingRedundancy = 1;
+    PlacementPolicySpy spy = new PlacementPolicySpy(ecPlacementPolicy,
+        data + parity + excluded.size() - 1);
+    Set<ContainerReplica> replicas = createReplicas(data + 
remainingRedundancy);
+
+    // WHEN
+    InsufficientDatanodesException e = assertThrows(
+        InsufficientDatanodesException.class,
+        () -> subject.processAndSendCommands(replicas, emptyList(), result, 
2));
+
+    // THEN
+    assertEquals(2, spy.callCount());
+    assertExcluded(excluded, spy.excludedNodes(0));
+    assertUsedNodes(replicas, spy.usedNodes(0));
+    assertExcluded(emptySet(), spy.excludedNodes(1));
+    assertUsedNodes(replicas, spy.usedNodes(1));
+    assertEquals(parity - remainingRedundancy, e.getRequiredNodes());
+    assertEquals(e.getRequiredNodes() - excluded.size(), 
e.getAvailableNodes());
+    verify(replicationManager, never())
+        .sendThrottledReconstructionCommand(any(), any());
+  }
+
+  private static UnderReplicatedHealthResult mockUnderReplicated(
+      ECReplicationConfig ec) {
+    UnderReplicatedHealthResult result =
+        mock(UnderReplicatedHealthResult.class);
+    when(result.getContainerInfo())
+        .thenReturn(createContainer(HddsProtos.LifeCycleState.CLOSED, ec));
+    return result;
+  }
+
+  private static void assertExcluded(Set<DatanodeDetails> excluded,
+      List<DatanodeDetails> excludedNodes) {
+    assertEquals(excluded, new TreeSet<>(excludedNodes));
+  }
+
+  private Set<DatanodeDetails> excludeInReplicationManager(int count) {
+    Set<DatanodeDetails> excluded = IntStream.range(0, count)
+        .mapToObj(i -> MockDatanodeDetails.randomDatanodeDetails())
+        .collect(toSet());
+    when(replicationManager.getExcludedNodes())
+        .thenReturn(excluded);
+    return excluded;
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"rs-3-2-1024k", "rs-6-3-1024k", "rs-10-4-1024k"})
+  void performsCriticalPartialReconstruction(String rep) throws IOException {
+    final ECReplicationConfig ec = new ECReplicationConfig(rep);
+    final int data = ec.getData();
+    final int parity = ec.getParity();
+    ECUnderReplicationHandler subject = new ECUnderReplicationHandler(
+        ecPlacementPolicy, conf, replicationManager);
+    UnderReplicatedHealthResult result = mockUnderReplicated(ec);
+
+    DatanodeDetails excludedByRM = MockDatanodeDetails.randomDatanodeDetails();
+    Set<DatanodeDetails> excluded = singleton(excludedByRM);
+    when(replicationManager.getExcludedNodes())
+        .thenReturn(excluded);
+
+    final int remainingRedundancy = 0;
+    PlacementPolicySpy spy = new PlacementPolicySpy(ecPlacementPolicy,
+        data + parity + excluded.size() - 1);
+    Set<ContainerReplica> replicas = createReplicas(data + 
remainingRedundancy);
+
+    // WHEN
+    InsufficientDatanodesException e = assertThrows(
+        InsufficientDatanodesException.class,
+        () -> subject.processAndSendCommands(replicas, emptyList(), result, 
2));
+
+    // THEN
+    assertEquals(1, spy.callCount());
+    assertEquals(singletonList(excludedByRM), spy.excludedNodes(0));
+    assertUsedNodes(replicas, spy.usedNodes(0));
+    assertEquals(parity - remainingRedundancy, e.getRequiredNodes());
+    assertEquals(e.getRequiredNodes() - excluded.size(), 
e.getAvailableNodes());
+    verify(replicationManager, times(1))
+        .sendThrottledReconstructionCommand(any(), any());
+  }
+
   @Test
   void excludesOverloadedNodes() throws IOException {
     ECUnderReplicationHandler subject = new ECUnderReplicationHandler(
         ecPlacementPolicy, conf, replicationManager);
-    Set<ContainerReplica> replicas = ReplicationTestUtil
-        .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-            Pair.of(IN_SERVICE, 3));
-    ContainerHealthResult.UnderReplicatedHealthResult result =
-        Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
-    Mockito.when(result.getContainerInfo()).thenReturn(container);
+    UnderReplicatedHealthResult result =
+        mock(UnderReplicatedHealthResult.class);
+    when(result.getContainerInfo()).thenReturn(container);
 
     DatanodeDetails excludedByRM = MockDatanodeDetails.randomDatanodeDetails();
-    Mockito.when(replicationManager.getExcludedNodes())
+    when(replicationManager.getExcludedNodes())
         .thenReturn(singleton(excludedByRM));
 
-    ArgumentCaptor<List<DatanodeDetails>> captor =
-        ArgumentCaptor.forClass(List.class);
-    // The used list is modified after it is passed to the placement policy,
-    // so a plain Captor won't work.
-    AtomicReference<List<DatanodeDetails>> usedList = new AtomicReference<>();
-    Mockito.when(ecPlacementPolicy.chooseDatanodes(any(), captor.capture(),
-            any(), anyInt(), anyLong(), anyLong())
-        ).thenAnswer(invocationOnMock -> {
-          usedList.set(new ArrayList<>(invocationOnMock.getArgument(0)));
-          int numNodes = invocationOnMock.getArgument(3);
-          List<DatanodeDetails> targets = new ArrayList<>();
-          for (int i = 0; i < numNodes; i++) {
-            targets.add(MockDatanodeDetails.randomDatanodeDetails());
-          }
-          return targets;
-        });
+    PlacementPolicySpy spy = new PlacementPolicySpy(ecPlacementPolicy,
+        repConfig.getRequiredNodes() + 1);
 
+    // WHEN
+    Set<ContainerReplica> replicas = createReplicas(3);
     subject.processAndSendCommands(
-        replicas, Collections.emptyList(), result, 2);
+        replicas, emptyList(), result, 2);
+
+    // THEN
+    assertEquals(1, spy.callCount());
+    assertEquals(singletonList(excludedByRM), spy.excludedNodes(0));
+    assertUsedNodes(replicas, spy.usedNodes(0));
+    verify(replicationManager, times(1))
+        .sendThrottledReconstructionCommand(any(), any());
+  }
 
-    Assertions.assertTrue(captor.getValue().contains(excludedByRM));
-    Assertions.assertEquals(3, usedList.get().size());
+  private static void assertUsedNodes(Set<ContainerReplica> replicas,
+      List<DatanodeDetails> usedNodes) {
+    assertEquals(replicas.size(), usedNodes.size());
     for (ContainerReplica r : replicas) {
-      Assertions.assertTrue(
-          usedList.get().contains(r.getDatanodeDetails()));
+      assertTrue(usedNodes.contains(r.getDatanodeDetails()));
     }
   }
 
+  private static Set<ContainerReplica> createReplicas(int count) {
+    ContainerID id = ContainerID.valueOf(1);
+    return IntStream.rangeClosed(1, count)
+        .mapToObj(i -> createContainerReplica(id, i, IN_SERVICE, CLOSED))
+        .collect(toSet());
+  }
+
   @Test
   public void testUnderReplicationWithMissingParityIndex5() throws IOException 
{
-    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
-        .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-            Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
+    Set<ContainerReplica> availableReplicas = createReplicas(4);
     testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
         availableReplicas, 0, 0, policy);
   }
@@ -215,8 +321,7 @@ public class TestECUnderReplicationHandler {
 
   @Test
   public void testUnderReplicationWithMissingIndex2345() throws IOException {
-    Set<ContainerReplica> availableReplicas =
-        ReplicationTestUtil.createReplicas(Pair.of(IN_SERVICE, 1));
+    Set<ContainerReplica> availableReplicas = createReplicas(1);
     testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3, 4, 5),
         availableReplicas, 0, 0, policy);
   }
@@ -230,15 +335,13 @@ public class TestECUnderReplicationHandler {
 
   @Test
   public void testThrowsWhenTargetsOverloaded() throws IOException {
-    Set<ContainerReplica> availableReplicas =  ReplicationTestUtil
-        .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-            Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
+    Set<ContainerReplica> availableReplicas = createReplicas(4);
 
     doThrow(new CommandTargetOverloadedException("Overloaded"))
         .when(replicationManager).sendThrottledReconstructionCommand(
             any(), any());
 
-    Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
+    assertThrows(CommandTargetOverloadedException.class, () ->
         testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
             availableReplicas, 0, 0, policy));
   }
@@ -252,11 +355,11 @@ public class TestECUnderReplicationHandler {
     Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
         testUnderReplicationWithMissingIndexes(
             Lists.emptyList(), availableReplicas, 1, 0, policy);
-    Assertions.assertEquals(1, cmds.size());
+    assertEquals(1, cmds.size());
     // Check the replicate command has index 1 set
     ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds
         .iterator().next().getValue();
-    Assertions.assertEquals(1, cmd.getReplicaIndex());
+    assertEquals(1, cmd.getReplicaIndex());
   }
 
 
@@ -267,7 +370,7 @@ public class TestECUnderReplicationHandler {
       throws IOException {
     Set<ContainerReplica> availableReplicas = new LinkedHashSet<>();
     ContainerReplica deadMaintenance =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             1, IN_MAINTENANCE, CLOSED);
     availableReplicas.add(deadMaintenance);
 
@@ -295,11 +398,11 @@ public class TestECUnderReplicationHandler {
     Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
         testUnderReplicationWithMissingIndexes(
             Lists.emptyList(), availableReplicas, 1, 0, policy);
-    Assertions.assertEquals(1, cmds.size());
+    assertEquals(1, cmds.size());
     // Check the replicate command has index 1 set
     for (Pair<DatanodeDetails, SCMCommand<?>> c : cmds) {
       // Ensure neither of the commands are for the dead maintenance node
-      Assertions.assertNotEquals(deadMaintenance.getDatanodeDetails(),
+      assertNotEquals(deadMaintenance.getDatanodeDetails(),
           c.getKey());
     }
   }
@@ -315,7 +418,7 @@ public class TestECUnderReplicationHandler {
         .when(replicationManager).sendThrottledReplicationCommand(
             any(), anyList(), any(), anyInt());
 
-    Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
+    assertThrows(CommandTargetOverloadedException.class, () ->
         testUnderReplicationWithMissingIndexes(
             Lists.emptyList(), availableReplicas, 1, 0, policy));
   }
@@ -377,28 +480,27 @@ public class TestECUnderReplicationHandler {
   public void testUnderReplicationWithInvalidPlacement()
           throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
-            .createReplicas(Pair.of(DECOMMISSIONING, 1),
-                    Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
-                    Pair.of(IN_SERVICE, 4));
-    PlacementPolicy mockedPolicy = Mockito.spy(policy);
+        .createReplicas(Pair.of(DECOMMISSIONING, 1),
+            Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4));
+    PlacementPolicy mockedPolicy = spy(policy);
     ContainerPlacementStatus mockedContainerPlacementStatus =
-            Mockito.mock(ContainerPlacementStatus.class);
-    Mockito.when(mockedContainerPlacementStatus.isPolicySatisfied())
-            .thenReturn(false);
-    Mockito.when(mockedPolicy.validateContainerPlacement(anyList(),
-            anyInt())).thenReturn(mockedContainerPlacementStatus);
-    Mockito.when(mockedPolicy.validateContainerPlacement(anyList(),
-            anyInt())).thenAnswer(invocationOnMock -> {
-              Set<DatanodeDetails> dns =
-                      new HashSet<>(invocationOnMock.getArgument(0));
-              Assert.assertTrue(
-                      availableReplicas.stream()
-                      .map(ContainerReplica::getDatanodeDetails)
-                      .filter(dn -> dn.getPersistedOpState() == IN_SERVICE)
-                      .allMatch(dns::contains));
-              return mockedContainerPlacementStatus;
-            });
-    testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+            mock(ContainerPlacementStatus.class);
+    when(mockedContainerPlacementStatus.isPolicySatisfied())
+        .thenReturn(false);
+    when(mockedPolicy.validateContainerPlacement(anyList(), anyInt()))
+        .thenReturn(mockedContainerPlacementStatus);
+    when(mockedPolicy.validateContainerPlacement(anyList(), anyInt()))
+        .thenAnswer(invocationOnMock -> {
+          Set<DatanodeDetails> dns =
+              new HashSet<>(invocationOnMock.getArgument(0));
+          assertTrue(availableReplicas.stream()
+              .map(ContainerReplica::getDatanodeDetails)
+              .filter(dn -> dn.getPersistedOpState() == IN_SERVICE)
+              .allMatch(dns::contains));
+          return mockedContainerPlacementStatus;
+        });
+    testUnderReplicationWithMissingIndexes(emptyList(),
             availableReplicas, 0, 0, mockedPolicy);
   }
 
@@ -422,13 +524,13 @@ public class TestECUnderReplicationHandler {
         .getNoNodesTestPlacementPolicy(nodeManager, conf);
 
     ContainerReplica overRepReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             4, IN_SERVICE, CLOSED);
     ContainerReplica decomReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
         5, DECOMMISSIONING, CLOSED);
     ContainerReplica maintReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             5, ENTERING_MAINTENANCE, CLOSED);
 
     List<ContainerReplica> replicasToAdd = new ArrayList<>();
@@ -439,8 +541,8 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH =
         new ECUnderReplicationHandler(
             noNodesPolicy, conf, replicationManager);
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             1, false, false, false);
 
     // The underRepHandler processes in stages. First missing indexes, then
@@ -453,18 +555,15 @@ public class TestECUnderReplicationHandler {
     // throw an exception, and then make it also over replicated, returning the
     // commands to fix the over replication instead.
     for (ContainerReplica toAdd : replicasToAdd) {
-      Mockito.clearInvocations(replicationManager);
-      Set<ContainerReplica> availableReplicas = ReplicationTestUtil
-          .createReplicas(Pair.of(IN_SERVICE, 1),
-              Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
-              Pair.of(IN_SERVICE, 4));
+      clearInvocations(replicationManager);
+      Set<ContainerReplica> availableReplicas = createReplicas(4);
       if (toAdd != null) {
         availableReplicas.add(toAdd);
       }
 
-      Assert.assertThrows(SCMException.class,
+      assertThrows(SCMException.class,
           () -> ecURH.processAndSendCommands(availableReplicas,
-              Collections.emptyList(), underRep, 2));
+              emptyList(), underRep, 2));
 
       // Now adjust replicas so it is also over replicated. This time before
       // throwing it should call the OverRepHandler and return whatever it
@@ -473,8 +572,8 @@ public class TestECUnderReplicationHandler {
 
       assertThrows(SCMException.class,
           () -> ecURH.processAndSendCommands(availableReplicas,
-              Collections.emptyList(), underRep, 2));
-      Mockito.verify(replicationManager, times(1))
+              emptyList(), underRep, 2));
+      verify(replicationManager, times(1))
           .processOverReplicatedContainer(underRep);
     }
   }
@@ -492,10 +591,10 @@ public class TestECUnderReplicationHandler {
         .getNoNodesTestPlacementPolicy(nodeManager, conf);
 
     ContainerReplica decomReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             5, DECOMMISSIONING, CLOSED);
     ContainerReplica maintReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             5, ENTERING_MAINTENANCE, CLOSED);
 
     List<ContainerReplica> replicasToAdd = new ArrayList<>();
@@ -506,11 +605,11 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH =
         new ECUnderReplicationHandler(
             noNodesPolicy, conf, replicationManager);
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             1, false, false, false);
     ContainerReplica unhealthyReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             4, IN_SERVICE, UNHEALTHY);
 
     /*
@@ -526,7 +625,7 @@ public class TestECUnderReplicationHandler {
      it should return the command to delete one.
      */
     for (ContainerReplica toAdd : replicasToAdd) {
-      Mockito.clearInvocations(replicationManager);
+      clearInvocations(replicationManager);
       Set<ContainerReplica> existingReplicas = ReplicationTestUtil
           .createReplicas(Pair.of(IN_SERVICE, 5),
               Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -536,10 +635,10 @@ public class TestECUnderReplicationHandler {
       }
 
       // should throw an SCMException indicating no targets were found
-      Assert.assertThrows(SCMException.class,
+      assertThrows(SCMException.class,
           () -> ecURH.processAndSendCommands(existingReplicas,
-              Collections.emptyList(), underRep, 2));
-      Mockito.verify(replicationManager, times(0))
+              emptyList(), underRep, 2));
+      verify(replicationManager, times(0))
           .sendThrottledDeleteCommand(eq(container), anyInt(),
               any(DatanodeDetails.class), anyBoolean());
 
@@ -550,7 +649,7 @@ public class TestECUnderReplicationHandler {
       */
       existingReplicas.add(unhealthyReplica);
       existingReplicas.add(
-          ReplicationTestUtil.createContainerReplica(container.containerID(),
+          createContainerReplica(container.containerID(),
               1, IN_SERVICE, UNHEALTHY));
 
       /*
@@ -559,61 +658,59 @@ public class TestECUnderReplicationHandler {
       assertions.
       */
       commandsSent.clear();
-      Mockito.doAnswer(invocation -> {
+      doAnswer(invocation -> {
         commandsSent.add(Pair.of(invocation.getArgument(2),
             createDeleteContainerCommand(invocation.getArgument(0),
                 invocation.getArgument(1))));
         return null;
       })
           .when(replicationManager)
-          .sendThrottledDeleteCommand(Mockito.any(ContainerInfo.class),
-              Mockito.anyInt(), Mockito.any(DatanodeDetails.class),
-              Mockito.eq(true));
+          .sendThrottledDeleteCommand(any(ContainerInfo.class),
+              anyInt(), any(DatanodeDetails.class),
+              eq(true));
 
       assertThrows(SCMException.class,
           () -> ecURH.processAndSendCommands(existingReplicas,
-              Collections.emptyList(), underRep, 2));
-      Mockito.verify(replicationManager, times(1))
+              emptyList(), underRep, 2));
+      verify(replicationManager, times(1))
           .sendThrottledDeleteCommand(container,
               unhealthyReplica.getReplicaIndex(),
               unhealthyReplica.getDatanodeDetails(), true);
-      Assertions.assertEquals(1, commandsSent.size());
+      assertEquals(1, commandsSent.size());
       Pair<DatanodeDetails, SCMCommand<?>> command =
           commandsSent.iterator().next();
-      Assertions.assertEquals(SCMCommandProto.Type.deleteContainerCommand,
+      assertEquals(SCMCommandProto.Type.deleteContainerCommand,
           command.getValue().getType());
       DeleteContainerCommand deleteCommand =
           (DeleteContainerCommand) command.getValue();
-      Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(),
+      assertEquals(unhealthyReplica.getDatanodeDetails(),
           command.getKey());
-      Assertions.assertEquals(container.containerID(),
+      assertEquals(container.containerID(),
           ContainerID.valueOf(deleteCommand.getContainerID()));
-      Assertions.assertEquals(unhealthyReplica.getReplicaIndex(),
+      assertEquals(unhealthyReplica.getReplicaIndex(),
           deleteCommand.getReplicaIndex());
     }
   }
 
   @Test
   public void testPartialReconstructionIfNotEnoughNodes() {
-    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
-        .createReplicas(Pair.of(IN_SERVICE, 1),
-            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3));
+    Set<ContainerReplica> availableReplicas = createReplicas(3);
     PlacementPolicy placementPolicy = ReplicationTestUtil
         .getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2);
     ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
         placementPolicy, conf, replicationManager);
 
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             0, false, false, false);
 
     assertThrows(InsufficientDatanodesException.class, () ->
-        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+        ecURH.processAndSendCommands(availableReplicas, emptyList(),
             underRep, 1));
-    Assertions.assertEquals(1, commandsSent.size());
+    assertEquals(1, commandsSent.size());
     ReconstructECContainersCommand cmd = (ReconstructECContainersCommand)
         commandsSent.iterator().next().getValue();
-    Assertions.assertEquals(1, cmd.getTargetDatanodes().size());
+    assertEquals(1, cmd.getTargetDatanodes().size());
   }
 
   @Test
@@ -624,19 +721,19 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
         policy, conf, replicationManager);
 
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             0, false, false, false);
 
     // Setup so reconstruction fails, but we should still get a replicate
     // command for the decommissioning node and an exception thrown.
     throwOverloadedExceptionOnReconstruction.set(true);
     assertThrows(CommandTargetOverloadedException.class, () ->
-        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+        ecURH.processAndSendCommands(availableReplicas, emptyList(),
             underRep, 1));
-    Assertions.assertEquals(1, commandsSent.size());
+    assertEquals(1, commandsSent.size());
     SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
-    Assertions.assertEquals(
+    assertEquals(
         SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
   }
 
@@ -651,16 +748,16 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
         placementPolicy, conf, replicationManager);
 
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             0, true, false, false);
 
     assertThrows(InsufficientDatanodesException.class, () ->
-        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+        ecURH.processAndSendCommands(availableReplicas, emptyList(),
             underRep, 1));
-    Assertions.assertEquals(1, commandsSent.size());
+    assertEquals(1, commandsSent.size());
     SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
-    Assertions.assertEquals(
+    assertEquals(
         SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
   }
 
@@ -673,17 +770,17 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
         policy, conf, replicationManager);
 
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             0, true, false, false);
 
     throwOverloadedExceptionOnReplication.set(true);
     assertThrows(CommandTargetOverloadedException.class, () ->
-        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+        ecURH.processAndSendCommands(availableReplicas, emptyList(),
             underRep, 1));
-    Assertions.assertEquals(1, commandsSent.size());
+    assertEquals(1, commandsSent.size());
     SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
-    Assertions.assertEquals(
+    assertEquals(
         SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
   }
 
@@ -699,16 +796,16 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
         placementPolicy, conf, replicationManager);
 
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             0, false, false, false);
 
     assertThrows(InsufficientDatanodesException.class, () ->
-        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+        ecURH.processAndSendCommands(availableReplicas, emptyList(),
             underRep, 2));
-    Assertions.assertEquals(1, commandsSent.size());
+    assertEquals(1, commandsSent.size());
     SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
-    Assertions.assertEquals(
+    assertEquals(
         SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
   }
 
@@ -722,17 +819,17 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
         policy, conf, replicationManager);
 
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             0, false, false, false);
 
     throwOverloadedExceptionOnReplication.set(true);
     assertThrows(CommandTargetOverloadedException.class, () ->
-        ecURH.processAndSendCommands(availableReplicas, 
Collections.emptyList(),
+        ecURH.processAndSendCommands(availableReplicas, emptyList(),
             underRep, 2));
-    Assertions.assertEquals(1, commandsSent.size());
+    assertEquals(1, commandsSent.size());
     SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
-    Assertions.assertEquals(
+    assertEquals(
         SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
   }
 
@@ -744,10 +841,10 @@ public class TestECUnderReplicationHandler {
         .getSameNodeTestPlacementPolicy(nodeManager, conf, newNode);
     
     ContainerReplica decomReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             5, DECOMMISSIONING, CLOSED);
     ContainerReplica maintReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             5, ENTERING_MAINTENANCE, CLOSED);
 
     List<ContainerReplica> replicasToAdd = new ArrayList<>();
@@ -757,8 +854,8 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH =
         new ECUnderReplicationHandler(
             sameNodePolicy, conf, replicationManager);
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             1, false, false, false);
 
     // The underRepHandler processes in stages. First missing indexes, then
@@ -780,18 +877,18 @@ public class TestECUnderReplicationHandler {
 
       assertThrows(SCMException.class,
           () -> ecURH.processAndSendCommands(availableReplicas,
-              Collections.emptyList(), underRep, 2));
+              emptyList(), underRep, 2));
 
-      Mockito.verify(replicationManager, times(1))
+      verify(replicationManager, times(1))
           .processOverReplicatedContainer(underRep);
-      Assertions.assertEquals(1, commandsSent.size());
+      assertEquals(1, commandsSent.size());
       Pair<DatanodeDetails, SCMCommand<?>> pair =
           commandsSent.iterator().next();
-      Assertions.assertEquals(newNode, pair.getKey());
-      Assertions.assertEquals(
+      assertEquals(newNode, pair.getKey());
+      assertEquals(
           SCMCommandProto.Type.reconstructECContainersCommand,
           pair.getValue().getType());
-      Mockito.clearInvocations(replicationManager);
+      clearInvocations(replicationManager);
       commandsSent.clear();
     }
   }
@@ -813,18 +910,18 @@ public class TestECUnderReplicationHandler {
         new ECUnderReplicationHandler(
             sameNodePolicy, conf, replicationManager);
 
-    ContainerHealthResult.UnderReplicatedHealthResult underRep =
-        new ContainerHealthResult.UnderReplicatedHealthResult(container,
+    UnderReplicatedHealthResult underRep =
+        new UnderReplicatedHealthResult(container,
             1, true, false, false);
 
-    Assert.assertThrows(SCMException.class,
+    assertThrows(SCMException.class,
         () -> ecURH.processAndSendCommands(availableReplicas,
-            Collections.emptyList(), underRep, 1));
+            emptyList(), underRep, 1));
 
     // Now adjust replicas so it is also over replicated. This time it should
     // call the OverRepHandler and then throw
     ContainerReplica overRepReplica =
-        ReplicationTestUtil.createContainerReplica(container.containerID(),
+        createContainerReplica(container.containerID(),
             4, IN_SERVICE, CLOSED);
     availableReplicas.add(overRepReplica);
 
@@ -833,16 +930,16 @@ public class TestECUnderReplicationHandler {
         createDeleteContainerCommand(container,
             overRepReplica.getReplicaIndex())));
 
-    Mockito.when(replicationManager.processOverReplicatedContainer(
+    when(replicationManager.processOverReplicatedContainer(
         underRep)).thenAnswer(invocationOnMock -> {
           commandsSent.addAll(expectedDelete);
           return expectedDelete.size();
         });
     assertThrows(SCMException.class, () -> ecURH.processAndSendCommands(
-        availableReplicas, Collections.emptyList(), underRep, 1));
-    Mockito.verify(replicationManager, times(1))
+        availableReplicas, emptyList(), underRep, 1));
+    verify(replicationManager, times(1))
         .processOverReplicatedContainer(underRep);
-    Assertions.assertEquals(true, expectedDelete.equals(commandsSent));
+    assertEquals(expectedDelete, commandsSent);
   }
 
   @Test
@@ -885,14 +982,14 @@ public class TestECUnderReplicationHandler {
     Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
         testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3),
             availableReplicas, 0, 0, policy);
-    Assertions.assertEquals(1, cmds.size());
+    assertEquals(1, cmds.size());
     ReconstructECContainersCommand cmd =
         (ReconstructECContainersCommand) cmds.iterator().next().getValue();
     // Ensure that all source nodes are IN_SERVICE, we should not have picked
     // the non in-service nodes for index 1.
     for (ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex s
         : cmd.getSources()) {
-      Assertions.assertEquals(
+      assertEquals(
           IN_SERVICE, s.getDnDetails().getPersistedOpState());
     }
   }
@@ -909,8 +1006,8 @@ public class TestECUnderReplicationHandler {
             Pair.of(IN_MAINTENANCE, 3), Pair.of(IN_SERVICE, 4),
             Pair.of(IN_SERVICE, 5));
 
-    Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
-            Mockito.isNull(), anyInt(), anyLong(), anyLong()))
+    when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+            isNull(), anyInt(), anyLong(), anyLong()))
         .thenAnswer(invocationOnMock -> {
           int numNodes = invocationOnMock.getArgument(3);
           List<DatanodeDetails> targets = new ArrayList<>();
@@ -920,17 +1017,17 @@ public class TestECUnderReplicationHandler {
           return targets;
         });
 
-    ContainerHealthResult.UnderReplicatedHealthResult result =
-        Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
-    Mockito.when(result.getContainerInfo()).thenReturn(container);
+    UnderReplicatedHealthResult result =
+        mock(UnderReplicatedHealthResult.class);
+    when(result.getContainerInfo()).thenReturn(container);
     ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
         ecPlacementPolicy, conf, replicationManager);
 
     handler.processAndSendCommands(availableReplicas,
-        Collections.emptyList(), result, 1);
-    Assertions.assertEquals(1, commandsSent.size());
-    Mockito.verify(ecPlacementPolicy, times(0))
-        .chooseDatanodes(anyList(), Mockito.isNull(), eq(0), anyLong(),
+        emptyList(), result, 1);
+    assertEquals(1, commandsSent.size());
+    verify(ecPlacementPolicy, times(0))
+        .chooseDatanodes(anyList(), isNull(), eq(0), anyLong(),
             anyLong());
   }
 
@@ -942,9 +1039,7 @@ public class TestECUnderReplicationHandler {
   @Test
   public void testDatanodesPendingAddAreNotSelectedAsTargets()
       throws IOException {
-    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
-        .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-            Pair.of(IN_SERVICE, 3));
+    Set<ContainerReplica> availableReplicas = createReplicas(3);
     DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
     List<ContainerReplicaOp> pendingOps = ImmutableList.of(
         ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD, dn, 
4));
@@ -955,8 +1050,8 @@ public class TestECUnderReplicationHandler {
     containing that DN. Ensures the test will fail if excludeNodes does not
     contain the DN pending ADD.
      */
-    Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
-            Mockito.isNull(), anyInt(), anyLong(), anyLong()))
+    when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+            isNull(), anyInt(), anyLong(), anyLong()))
         .thenAnswer(invocationOnMock -> {
           List<DatanodeDetails> usedList = invocationOnMock.getArgument(0);
           List<DatanodeDetails> excludeList = invocationOnMock.getArgument(1);
@@ -969,15 +1064,15 @@ public class TestECUnderReplicationHandler {
           return targets;
         });
 
-    ContainerHealthResult.UnderReplicatedHealthResult result =
-        Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
-    Mockito.when(result.getContainerInfo()).thenReturn(container);
+    UnderReplicatedHealthResult result =
+        mock(UnderReplicatedHealthResult.class);
+    when(result.getContainerInfo()).thenReturn(container);
     ECUnderReplicationHandler handler = new ECUnderReplicationHandler(
         ecPlacementPolicy, conf, replicationManager);
 
     handler.processAndSendCommands(availableReplicas, pendingOps, result, 1);
-    Assertions.assertEquals(1, commandsSent.size());
-    Assertions.assertNotEquals(dn, commandsSent.iterator().next().getKey());
+    assertEquals(1, commandsSent.size());
+    assertNotEquals(dn, commandsSent.iterator().next().getKey());
   }
 
   @Test
@@ -985,17 +1080,17 @@ public class TestECUnderReplicationHandler {
       throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1));
-    ContainerReplica decomReplica = ReplicationTestUtil.createContainerReplica(
+    ContainerReplica decomReplica = createContainerReplica(
         container.containerID(), 2, DECOMMISSIONING, CLOSED);
     availableReplicas.add(decomReplica);
     Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
-        testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+        testUnderReplicationWithMissingIndexes(emptyList(),
             availableReplicas, 1, 0, policy);
-    Assertions.assertEquals(1, cmds.size());
+    assertEquals(1, cmds.size());
     // With push replication the command should always be sent to the
     // decommissioning source.
     DatanodeDetails target = cmds.iterator().next().getKey();
-    Assertions.assertEquals(decomReplica.getDatanodeDetails(), target);
+    assertEquals(decomReplica.getDatanodeDetails(), target);
   }
 
   @Test
@@ -1003,25 +1098,25 @@ public class TestECUnderReplicationHandler {
       throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1));
-    ContainerReplica maintReplica = ReplicationTestUtil.createContainerReplica(
+    ContainerReplica maintReplica = createContainerReplica(
         container.containerID(), 2, ENTERING_MAINTENANCE, CLOSED);
     availableReplicas.add(maintReplica);
 
     Set<Pair<DatanodeDetails, SCMCommand<?>>> cmds =
-        testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+        testUnderReplicationWithMissingIndexes(emptyList(),
             availableReplicas, 0, 1, policy);
-    Assertions.assertEquals(0, cmds.size());
+    assertEquals(0, cmds.size());
 
     // Change the remaining redundancy to ensure something needs copied.
     remainingMaintenanceRedundancy = 2;
-    cmds = testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+    cmds = testUnderReplicationWithMissingIndexes(emptyList(),
         availableReplicas, 0, 1, policy);
 
-    Assertions.assertEquals(1, cmds.size());
+    assertEquals(1, cmds.size());
     // With push replication the command should always be sent to the
     // entering_maintenance source.
     DatanodeDetails target = cmds.iterator().next().getKey();
-    Assertions.assertEquals(maintReplica.getDatanodeDetails(), target);
+    assertEquals(maintReplica.getDatanodeDetails(), target);
   }
 
   public Set<Pair<DatanodeDetails, SCMCommand<?>>>
@@ -1032,10 +1127,10 @@ public class TestECUnderReplicationHandler {
     ECUnderReplicationHandler ecURH =
         new ECUnderReplicationHandler(
             placementPolicy, conf, replicationManager);
-    ContainerHealthResult.UnderReplicatedHealthResult result =
-        Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
-    Mockito.when(result.isUnrecoverable()).thenReturn(false);
-    Mockito.when(result.getContainerInfo()).thenReturn(container);
+    UnderReplicatedHealthResult result =
+        mock(UnderReplicatedHealthResult.class);
+    when(result.isUnrecoverable()).thenReturn(false);
+    when(result.getContainerInfo()).thenReturn(container);
 
     ecURH.processAndSendCommands(availableReplicas, ImmutableList.of(),
         result, remainingMaintenanceRedundancy);
@@ -1054,7 +1149,7 @@ public class TestECUnderReplicationHandler {
       } else if (dnCommand
           .getValue() instanceof ReconstructECContainersCommand) {
         if (shouldReconstructCommandExist) {
-          Assertions.assertArrayEquals(missingIndexesByteArr,
+          assertArrayEquals(missingIndexesByteArr,
               ((ReconstructECContainersCommand) dnCommand.getValue())
               .getMissingContainerIndexes());
         }
@@ -1064,9 +1159,9 @@ public class TestECUnderReplicationHandler {
     int maxMaintenance = PARITY - remainingMaintenanceRedundancy;
     int expectedMaintenanceCommands = Math.max(0,
         maintenanceIndexes - maxMaintenance);
-    Assertions.assertEquals(decomIndexes + expectedMaintenanceCommands,
+    assertEquals(decomIndexes + expectedMaintenanceCommands,
         replicateCommand);
-    Assertions.assertEquals(shouldReconstructCommandExist ? 1 : 0,
+    assertEquals(shouldReconstructCommandExist ? 1 : 0,
         reconstructCommand);
     return commandsSent;
   }
@@ -1078,4 +1173,53 @@ public class TestECUnderReplicationHandler {
     deleteCommand.setReplicaIndex(replicaIndex);
     return deleteCommand;
   }
+
+  /**
+   * Helper to mock and verify calls to
+   * {@link PlacementPolicy#chooseDatanodes(List, List, int, long, long)}.
+   */
+  private static class PlacementPolicySpy {
+
+    private final List<List<DatanodeDetails>> excludedNodesLists =
+        new ArrayList<>();
+    private final List<List<DatanodeDetails>> usedNodesLists =
+        new ArrayList<>();
+
+    PlacementPolicySpy(PlacementPolicy placementPolicy, int totalNodes)
+        throws IOException {
+      when(placementPolicy.chooseDatanodes(any(), any(),
+          any(), anyInt(), anyLong(), anyLong())
+      ).thenAnswer(invocation -> {
+        final Collection<DatanodeDetails> used = invocation.getArgument(0);
+        final Collection<DatanodeDetails> excluded = invocation.getArgument(1);
+        final int nodesRequired = invocation.getArgument(3);
+
+        final int availableNodes = totalNodes - excluded.size() - used.size();
+        usedNodesLists.add(new ArrayList<>(used));
+        excludedNodesLists.add(new ArrayList<>(excluded));
+
+        final List<DatanodeDetails> targets = new ArrayList<>();
+        for (int i = 0; i < Math.min(nodesRequired, availableNodes); i++) {
+          targets.add(MockDatanodeDetails.randomDatanodeDetails());
+        }
+        if (targets.isEmpty()) {
+          throw new SCMException("not enough nodes",
+              SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+        }
+        return targets;
+      });
+    }
+
+    int callCount() {
+      return usedNodesLists.size();
+    }
+
+    List<DatanodeDetails> usedNodes(int i) {
+      return usedNodesLists.get(i);
+    }
+
+    List<DatanodeDetails> excludedNodes(int i) {
+      return excludedNodesLists.get(i);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to