This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 91227327b1 HDDS-8309. ReplicationManager: Basic Throttling of EC 
Reconstruction commands (#4496)
91227327b1 is described below

commit 91227327b1831824e4f82b31989ceff5188a8fb8
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Mar 30 15:40:06 2023 +0100

    HDDS-8309. ReplicationManager: Basic Throttling of EC Reconstruction 
commands (#4496)
---
 .../replication/ECUnderReplicationHandler.java     |   8 +-
 .../container/replication/ReplicationManager.java  | 104 ++++++++++++-----
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  19 +++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  31 +++++
 .../hadoop/hdds/scm/container/MockNodeManager.java |   6 +
 .../hdds/scm/container/SimpleMockNodeManager.java  |   7 ++
 .../container/replication/ReplicationTestUtil.java |  22 ++++
 .../replication/TestECUnderReplicationHandler.java |  17 +++
 .../replication/TestReplicationManager.java        | 128 +++++++++++++++++++--
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   8 ++
 .../testutils/ReplicationNodeManagerMock.java      |   7 ++
 11 files changed, 309 insertions(+), 48 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index dcdfb8d0a5..3c13dfe6b6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -315,10 +315,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
                 sourceDatanodesWithIndex, selectedDatanodes,
                 int2byte(missingIndexes),
                 repConfig);
-        replicationManager.sendDatanodeCommand(reconstructionCommand,
-            container, selectedDatanodes.get(0));
-        // For each index that is going to be reconstructed with this command,
-        // adjust the replica count to reflect the pending operation.
+        replicationManager.sendThrottledReconstructionCommand(
+            container, reconstructionCommand);
         for (int i = 0; i < missingIndexes.size(); i++) {
           adjustPendingOps(
               replicaCount, selectedDatanodes.get(i), missingIndexes.get(i));
@@ -470,7 +468,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
   }
 
   private void adjustPendingOps(ECContainerReplicaCount replicaCount,
-      DatanodeDetails target, int replicaIndex) {
+                                DatanodeDetails target, int replicaIndex) {
     replicaCount.addPendingOp(new ContainerReplicaOp(
         ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
         Long.MAX_VALUE));
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 4891e0dc10..23b226d978 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.PostConstruct;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
@@ -75,7 +74,6 @@ import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -179,6 +177,7 @@ public class ReplicationManager implements SCMService {
   private final OverReplicatedProcessor overReplicatedProcessor;
   private final HealthCheck containerCheckChain;
   private final int datanodeReplicationLimit;
+  private final int reconstructionCommandWeight;
   private final int datanodeDeleteLimit;
 
   /**
@@ -231,6 +230,7 @@ public class ReplicationManager implements SCMService {
     this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
     this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();
     this.datanodeReplicationLimit = rmConf.getDatanodeReplicationLimit();
+    this.reconstructionCommandWeight = rmConf.getReconstructionCommandWeight();
     this.datanodeDeleteLimit = rmConf.getDatanodeDeleteLimit();
 
     ecUnderReplicationHandler = new ECUnderReplicationHandler(
@@ -484,17 +484,20 @@ public class ReplicationManager implements SCMService {
   public void sendThrottledDeleteCommand(final ContainerInfo container,
       int replicaIndex, final DatanodeDetails datanode, boolean force)
       throws NotLeaderException, CommandTargetOverloadedException {
-    List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount =
-        getAvailableDatanodes(Collections.singletonList(datanode),
-            Type.deleteContainerCommand, datanodeDeleteLimit);
-    if (datanodeWithCommandCount.isEmpty()) {
-      throw new CommandTargetOverloadedException("Cannot schedule a delete " +
-          "container command for container " + container.containerID() +
-          " on datanode " + datanode + " as it has too many pending delete " +
-          "commands");
+    try {
+      int commandCount = nodeManager.getTotalDatanodeCommandCount(datanode,
+          Type.deleteContainerCommand);
+      if (commandCount >= datanodeDeleteLimit) {
+        throw new CommandTargetOverloadedException("Cannot schedule a delete " 
+
+            "container command for container " + container.containerID() +
+            " on datanode " + datanode + " as it has too many pending delete " 
+
+            "commands (" + commandCount + ")");
+      }
+      sendDeleteCommand(container, replicaIndex, datanode, force);
+    } catch (NodeNotFoundException e) {
+      throw new IllegalArgumentException("Datanode " + datanode + " not " +
+          "found in NodeManager. Should not happen");
     }
-    sendDeleteCommand(container, replicaIndex, datanodeWithCommandCount.get(0)
-        .getRight(), force);
   }
 
   /**
@@ -516,8 +519,8 @@ public class ReplicationManager implements SCMService {
       List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
       throws CommandTargetOverloadedException, NotLeaderException {
     long containerID = containerInfo.getContainerID();
-    List<Pair<Integer, DatanodeDetails>> sourceWithCmds = 
getAvailableDatanodes(
-        sources, Type.replicateContainerCommand, datanodeReplicationLimit);
+    List<Pair<Integer, DatanodeDetails>> sourceWithCmds =
+        getAvailableDatanodesForReplication(sources);
     if (sourceWithCmds.isEmpty()) {
       throw new CommandTargetOverloadedException("No sources with capacity " +
           "available for replication of container " + containerID + " to " +
@@ -532,33 +535,53 @@ public class ReplicationManager implements SCMService {
     sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
   }
 
+  public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
+      ReconstructECContainersCommand command)
+      throws CommandTargetOverloadedException, NotLeaderException {
+    List<DatanodeDetails> targets = command.getTargetDatanodes();
+    List<Pair<Integer, DatanodeDetails>> targetWithCmds =
+        getAvailableDatanodesForReplication(targets);
+    if (targetWithCmds.isEmpty()) {
+      throw new CommandTargetOverloadedException("No target with capacity " +
+          "available for reconstruction of " + containerInfo.getContainerID());
+    }
+    // Put the least loaded target first
+    targetWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
+    sendDatanodeCommand(command, containerInfo,
+        targetWithCmds.get(0).getRight());
+  }
+
   /**
-   * For the given datanodes and command type, lookup the current queue command
-   * count and return a list of datanodes with the current command count. If
-   * any datanode is at or beyond the limit, then it will not be included in 
the
+   * For the given datanodes, lookup the current queued command count for
+   * replication and reconstruction and return a list of datanodes with the
+   * total queued count which are less than the limit.
+   * Any datanode is at or beyond the limit, then it will not be included in 
the
    * returned list.
    * @param datanodes List of datanodes to check for available capacity
-   * @param commandType The Type of datanode command to check the capacity for.
-   * @param limit The limit of commands of that type.
    * @return List of datanodes with the current command count that are not over
    *         the limit.
    */
-  private List<Pair<Integer, DatanodeDetails>> getAvailableDatanodes(
-      List<DatanodeDetails> datanodes,
-      StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type commandType,
-      int limit) {
+  private List<Pair<Integer, DatanodeDetails>>
+      getAvailableDatanodesForReplication(List<DatanodeDetails> datanodes) {
     List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount
         = new ArrayList<>();
     for (DatanodeDetails dn : datanodes) {
       try {
-        int commandCount = nodeManager.getTotalDatanodeCommandCount(dn,
-            commandType);
-        if (commandCount >= limit) {
+        Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
+            dn, Type.replicateContainerCommand,
+            Type.reconstructECContainersCommand);
+        int replicateCount = counts.get(Type.replicateContainerCommand);
+        int reconstructCount = counts.get(Type.reconstructECContainersCommand);
+        int totalCount = replicateCount
+            + reconstructCount * reconstructionCommandWeight;
+        if (totalCount >= datanodeReplicationLimit) {
           LOG.debug("Datanode {} has reached the maximum number of queued " +
-              "{} commands ({})", dn, commandType, limit);
+              "commands, replication: {}, reconstruction: {} * {})",
+              dn, replicateCount, reconstructCount,
+              reconstructionCommandWeight);
           continue;
         }
-        datanodeWithCommandCount.add(Pair.of(commandCount, dn));
+        datanodeWithCommandCount.add(Pair.of(totalCount, dn));
       } catch (NodeNotFoundException e) {
         LOG.error("Node {} not found in NodeManager. Should not happen",
             dn, e);
@@ -1160,9 +1183,9 @@ public class ReplicationManager implements SCMService {
         defaultValue = "20",
         tags = { SCM, DATANODE },
         description = "A limit to restrict the total number of replication " +
-            "commands queued on a datanode. Note this is intended to be a " +
-            " temporary config until we have a more dynamic way of limiting " +
-            "load."
+            "and reconstruction commands queued on a datanode. Note this is " +
+            "intended to be a temporary config until we have a more dynamic " +
+            "way of limiting load."
     )
     private int datanodeReplicationLimit = 20;
 
@@ -1170,6 +1193,21 @@ public class ReplicationManager implements SCMService {
       return datanodeReplicationLimit;
     }
 
+    @Config(key = "datanode.reconstruction.weight",
+        type = ConfigType.INT,
+        defaultValue = "3",
+        tags = { SCM, DATANODE },
+        description = "When counting the number of replication commands on a " 
+
+            "datanode, the number of reconstruction commands is multiplied " +
+            "by this weight to ensure reconstruction commands use more of " +
+            "the capacity, as they are more expensive to process."
+    )
+    private int reconstructionCommandWeight = 3;
+
+    public int getReconstructionCommandWeight() {
+      return reconstructionCommandWeight;
+    }
+
     @Config(key = "datanode.delete.container.limit",
         type = ConfigType.INT,
         defaultValue = "40",
@@ -1240,6 +1278,10 @@ public class ReplicationManager implements SCMService {
             + " set to " + datanodeTimeoutOffset + " and must be <"
             + " event.timeout, which is set to " + eventTimeout);
       }
+      if (reconstructionCommandWeight <= 0) {
+        throw new IllegalArgumentException("reconstructionCommandWeight is"
+            + " set to " + reconstructionCommandWeight + " and must be > 0");
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 8d228b54e5..3601c3230d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -331,6 +331,25 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
   int getTotalDatanodeCommandCount(DatanodeDetails datanodeDetails,
       SCMCommandProto.Type cmdType) throws NodeNotFoundException;
 
+  /**
+   * Get the total number of pending commands of the given types on the given
+   * datanode. For each command, this includes both the number of commands
+   * queued in SCM which will be sent to the datanode on the next heartbeat,
+   * and the number of commands reported by the datanode in the last heartbeat.
+   * If the datanode has not reported any information for the given command,
+   * zero is assumed.
+   * All commands are retrieved under a single read lock, so the counts are
+   * consistent.
+   * @param datanodeDetails The datanode to query.
+   * @param cmdType The list of command Types To query.
+   * @return A Map of commandType to Integer with an entry for each command 
type
+   *         passed.
+   * @throws NodeNotFoundException
+   */
+  Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+      DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType)
+      throws NodeNotFoundException;
+
   /**
    * Get list of SCMCommands in the Command Queue for a particular Datanode.
    * @param dnID - Datanode uuid.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 979ccc4b66..b8e1aa9ca4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -818,6 +818,37 @@ public class SCMNodeManager implements NodeManager {
     }
   }
 
+  /**
+   * Get the total number of pending commands of the given types on the given
+   * datanode. For each command, this includes both the number of commands
+   * queued in SCM which will be sent to the datanode on the next heartbeat,
+   * and the number of commands reported by the datanode in the last heartbeat.
+   * If the datanode has not reported any information for the given command,
+   * zero is assumed.
+   * All commands are retrieved under a single read lock, so the counts are
+   * consistent.
+   * @param datanodeDetails The datanode to query.
+   * @param cmdType The list of command Types To query.
+   * @return A Map of commandType to Integer with an entry for each command 
type
+   *         passed.
+   * @throws NodeNotFoundException
+   */
+  @Override
+  public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+      DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType)
+      throws NodeNotFoundException {
+    Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+    readLock().lock();
+    try {
+      for (SCMCommandProto.Type type : cmdType) {
+        counts.put(type, getTotalDatanodeCommandCount(datanodeDetails, type));
+      }
+      return counts;
+    } finally {
+      readLock().unlock();
+    }
+  }
+
   /**
    * Returns the aggregated node stats.
    *
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index c10575925e..27e3b1aeee 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -633,6 +633,12 @@ public class MockNodeManager implements NodeManager {
     return 0;
   }
 
+  @Override
+  public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+      DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType) {
+    return Collections.emptyMap();
+  }
+
   /**
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 69fb5ffa25..6b09562d9d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -43,6 +43,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -334,6 +335,12 @@ public class SimpleMockNodeManager implements NodeManager {
     return 0;
   }
 
+  @Override
+  public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+      DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType) {
+    return Collections.emptyMap();
+  }
+
   @Override
   public List<SCMCommand> getCommandQueue(UUID dnID) {
     return null;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index fc6b9c143a..d0b1f6322a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -344,6 +345,27 @@ public final class ReplicationTestUtil {
         Mockito.any(DatanodeDetails.class), anyInt());
   }
 
+  /**
+   * Given a Mockito mock of ReplicationManager, this method will mock the
+   * SendThrottledReconstructionCommand method so that it adds the command
+   * created to the commandsSent set.
+   * @param mock Mock of ReplicationManager
+   * @param commandsSent Set to add the command to rather than sending it.
+   * @throws NotLeaderException
+   * @throws CommandTargetOverloadedException
+   */
+  public static void mockSendThrottledReconstructionCommand(
+      ReplicationManager mock,
+      Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+      throws NotLeaderException, CommandTargetOverloadedException {
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      ReconstructECContainersCommand cmd = invocationOnMock.getArgument(1);
+      commandsSent.add(Pair.of(cmd.getTargetDatanodes().get(0), cmd));
+      return null;
+    }).when(mock).sendThrottledReconstructionCommand(
+        Mockito.any(ContainerInfo.class), Mockito.any());
+  }
+
   /**
    * Given a Mockito mock of ReplicationManager, this method will mock the
    * sendDatanodeCommand method so that it adds the command created to the
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 867bdd15d9..6ea963ab14 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -120,6 +120,8 @@ public class TestECUnderReplicationHandler {
         replicationManager, commandsSent);
     ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
         replicationManager, commandsSent);
+    ReplicationTestUtil.mockSendThrottledReconstructionCommand(
+        replicationManager, commandsSent);
 
     conf = SCMTestUtils.getConf();
     repConfig = new ECReplicationConfig(DATA, PARITY);
@@ -169,6 +171,21 @@ public class TestECUnderReplicationHandler {
         availableReplicas, 0, 0, policy);
   }
 
+  @Test
+  public void testThrowsWhenTargetsOverloaded() throws IOException {
+    Set<ContainerReplica> availableReplicas =  ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+            Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
+
+    doThrow(new CommandTargetOverloadedException("Overloaded"))
+        .when(replicationManager).sendThrottledReconstructionCommand(
+            any(), any());
+
+    Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
+        testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+            availableReplicas, 0, 0, policy));
+  }
+
   @Test
   public void testUnderReplicationWithDecomIndex1() throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index c13c3b7da8..b48532d0fe 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -859,7 +859,7 @@ public class TestReplicationManager {
   }
 
   @Test
-  public void testCreateThrottledReplicateContainerCommand()
+  public void testSendThrottledReplicateContainerCommand()
       throws CommandTargetOverloadedException, NodeNotFoundException,
       NotLeaderException {
     Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
@@ -869,11 +869,16 @@ public class TestReplicationManager {
       sourceNodes.put(MockDatanodeDetails.randomDatanodeDetails(), i * 5);
     }
 
-    Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
-            eq(SCMCommandProto.Type.replicateContainerCommand)))
+    Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+            eq(SCMCommandProto.Type.replicateContainerCommand),
+            eq(SCMCommandProto.Type.reconstructECContainersCommand)))
         .thenAnswer(invocation -> {
+          Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
           DatanodeDetails dn = invocation.getArgument(0);
-          return sourceNodes.get(dn);
+          counts.put(SCMCommandProto.Type.replicateContainerCommand,
+              sourceNodes.get(dn));
+          counts.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+          return counts;
         });
 
     ContainerInfo container = ReplicationTestUtil.createContainerInfo(
@@ -892,27 +897,126 @@ public class TestReplicationManager {
   }
 
   @Test(expected = CommandTargetOverloadedException.class)
-  public void testCreateThrottledReplicateContainerCommandThrowsWhenNoSources()
+  public void testSendThrottledReplicateContainerCommandThrowsWhenNoSources()
       throws CommandTargetOverloadedException, NodeNotFoundException,
       NotLeaderException {
+    // Reconstruction commands also count toward the limit, so set things up
+    // so that the nodes are at the limit caused by 1 reconstruction command
+    // and the remaining replication commands
     int limit = replicationManager.getConfig().getDatanodeReplicationLimit();
-    Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
+    int reconstructionWeight = replicationManager.getConfig()
+        .getReconstructionCommandWeight();
+    int reconstructionCount = 1;
+    int replicationCount = limit - reconstructionCount * reconstructionWeight;
+    List<DatanodeDetails> sourceNodes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
-      sourceNodes.put(MockDatanodeDetails.randomDatanodeDetails(), limit + 1);
+      sourceNodes.add(MockDatanodeDetails.randomDatanodeDetails());
     }
 
-    Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
-            eq(SCMCommandProto.Type.replicateContainerCommand)))
+    Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+            eq(SCMCommandProto.Type.replicateContainerCommand),
+            eq(SCMCommandProto.Type.reconstructECContainersCommand)))
         .thenAnswer(invocation -> {
-          DatanodeDetails dn = invocation.getArgument(0);
-          return sourceNodes.get(dn);
+          Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+          counts.put(SCMCommandProto.Type.replicateContainerCommand,
+              replicationCount);
+          counts.put(SCMCommandProto.Type.reconstructECContainersCommand,
+              reconstructionCount);
+          return counts;
         });
 
     DatanodeDetails destination = MockDatanodeDetails.randomDatanodeDetails();
     ContainerInfo container = ReplicationTestUtil.createContainerInfo(
         repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
     replicationManager.sendThrottledReplicationCommand(
-            container, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+            container, sourceNodes, destination, 0);
+  }
+
+  @Test
+  public void testSendThrottledReconstructionCommand()
+      throws CommandTargetOverloadedException, NodeNotFoundException,
+      NotLeaderException {
+    Map<DatanodeDetails, Integer> targetNodes = new HashMap<>();
+    DatanodeDetails cmdTarget = MockDatanodeDetails.randomDatanodeDetails();
+    targetNodes.put(cmdTarget, 0);
+    targetNodes.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+
+    Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+            eq(SCMCommandProto.Type.replicateContainerCommand),
+            eq(SCMCommandProto.Type.reconstructECContainersCommand)))
+        .thenAnswer(invocation -> {
+          Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+          DatanodeDetails dn = invocation.getArgument(0);
+          counts.put(SCMCommandProto.Type.replicateContainerCommand,
+              targetNodes.get(dn));
+          counts.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+          return counts;
+        });
+
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+
+    ReconstructECContainersCommand command = createReconstructionCommand(
+        container, new ArrayList<>(targetNodes.keySet()));
+
+    replicationManager.sendThrottledReconstructionCommand(container, command);
+
+    Assertions.assertEquals(1, commandsSent.size());
+    Pair<UUID, SCMCommand<?>> cmd = commandsSent.iterator().next();
+    Assertions.assertEquals(cmdTarget.getUuid(), cmd.getLeft());
+  }
+
+  @Test(expected = CommandTargetOverloadedException.class)
+  public void testSendThrottledReconstructionCommandThrowsWhenNoTargets()
+      throws CommandTargetOverloadedException, NodeNotFoundException,
+      NotLeaderException {
+    int limit = replicationManager.getConfig().getDatanodeReplicationLimit();
+    int reconstructionWeight = replicationManager.getConfig()
+        .getReconstructionCommandWeight();
+
+    // We want to test that Replication commands also count toward the limit,
+    // and also that the weight is applied the the reconstruction count.
+    // Using the values below will set the targets at their limit.
+    int reconstructionCount = 2;
+    int replicationCount = limit - reconstructionCount * reconstructionWeight;
+
+    List<DatanodeDetails> targets = new ArrayList<>();
+    targets.add(MockDatanodeDetails.randomDatanodeDetails());
+    targets.add(MockDatanodeDetails.randomDatanodeDetails());
+
+    Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+            eq(SCMCommandProto.Type.replicateContainerCommand),
+            eq(SCMCommandProto.Type.reconstructECContainersCommand)))
+        .thenAnswer(invocation -> {
+          Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+          counts.put(SCMCommandProto.Type.replicateContainerCommand,
+              replicationCount);
+          counts.put(SCMCommandProto.Type.reconstructECContainersCommand,
+              reconstructionCount);
+          return counts;
+        });
+
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+    ReconstructECContainersCommand command = createReconstructionCommand(
+        container, targets);
+    replicationManager.sendThrottledReconstructionCommand(container, command);
+  }
+
+  private ReconstructECContainersCommand createReconstructionCommand(
+      ContainerInfo containerInfo, List<DatanodeDetails> targets) {
+    List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex> sources
+        = new ArrayList<>();
+    for (int i = 1; i <= 3; i++) {
+      sources.add(
+          new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(
+              MockDatanodeDetails.randomDatanodeDetails(), i));
+    }
+    byte[] missingIndexes = new byte[]{4, 5};
+
+    return new ReconstructECContainersCommand(
+        containerInfo.getContainerID(), sources,
+        targets, missingIndexes, (ECReplicationConfig) repConfig);
   }
 
   @Test
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index f3838c00f5..6a789cb92a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -1039,6 +1039,14 @@ public class TestSCMNodeManager {
         node1, SCMCommandProto.Type.replicateContainerCommand));
     Assertions.assertEquals(16, nodeManager.getTotalDatanodeCommandCount(
         node1, SCMCommandProto.Type.closeContainerCommand));
+    Map<SCMCommandProto.Type, Integer> counts =
+        nodeManager.getTotalDatanodeCommandCounts(node1,
+            SCMCommandProto.Type.replicateContainerCommand,
+            SCMCommandProto.Type.closeContainerCommand);
+    Assertions.assertEquals(0,
+        counts.get(SCMCommandProto.Type.replicateContainerCommand));
+    Assertions.assertEquals(16,
+        counts.get(SCMCommandProto.Type.closeContainerCommand));
   }
 
   @Test
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index f3e082cf43..5ce8153695 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -47,6 +47,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -482,6 +483,12 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
     return 0;
   }
 
+  @Override
+  public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+      DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType) {
+    return Collections.emptyMap();
+  }
+
   @Override
   public void onMessage(CommandForDatanode commandForDatanode,
                         EventPublisher publisher) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to