This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 91227327b1 HDDS-8309. ReplicationManager: Basic Throttling of EC
Reconstruction commands (#4496)
91227327b1 is described below
commit 91227327b1831824e4f82b31989ceff5188a8fb8
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Mar 30 15:40:06 2023 +0100
HDDS-8309. ReplicationManager: Basic Throttling of EC Reconstruction
commands (#4496)
---
.../replication/ECUnderReplicationHandler.java | 8 +-
.../container/replication/ReplicationManager.java | 104 ++++++++++++-----
.../apache/hadoop/hdds/scm/node/NodeManager.java | 19 +++
.../hadoop/hdds/scm/node/SCMNodeManager.java | 31 +++++
.../hadoop/hdds/scm/container/MockNodeManager.java | 6 +
.../hdds/scm/container/SimpleMockNodeManager.java | 7 ++
.../container/replication/ReplicationTestUtil.java | 22 ++++
.../replication/TestECUnderReplicationHandler.java | 17 +++
.../replication/TestReplicationManager.java | 128 +++++++++++++++++++--
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 8 ++
.../testutils/ReplicationNodeManagerMock.java | 7 ++
11 files changed, 309 insertions(+), 48 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index dcdfb8d0a5..3c13dfe6b6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -315,10 +315,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
sourceDatanodesWithIndex, selectedDatanodes,
int2byte(missingIndexes),
repConfig);
- replicationManager.sendDatanodeCommand(reconstructionCommand,
- container, selectedDatanodes.get(0));
- // For each index that is going to be reconstructed with this command,
- // adjust the replica count to reflect the pending operation.
+ replicationManager.sendThrottledReconstructionCommand(
+ container, reconstructionCommand);
for (int i = 0; i < missingIndexes.size(); i++) {
adjustPendingOps(
replicaCount, selectedDatanodes.get(i), missingIndexes.get(i));
@@ -470,7 +468,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
}
private void adjustPendingOps(ECContainerReplicaCount replicaCount,
- DatanodeDetails target, int replicaIndex) {
+ DatanodeDetails target, int replicaIndex) {
replicaCount.addPendingOp(new ContainerReplicaOp(
ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
Long.MAX_VALUE));
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 4891e0dc10..23b226d978 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
@@ -75,7 +74,6 @@ import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -179,6 +177,7 @@ public class ReplicationManager implements SCMService {
private final OverReplicatedProcessor overReplicatedProcessor;
private final HealthCheck containerCheckChain;
private final int datanodeReplicationLimit;
+ private final int reconstructionCommandWeight;
private final int datanodeDeleteLimit;
/**
@@ -231,6 +230,7 @@ public class ReplicationManager implements SCMService {
this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();
this.datanodeReplicationLimit = rmConf.getDatanodeReplicationLimit();
+ this.reconstructionCommandWeight = rmConf.getReconstructionCommandWeight();
this.datanodeDeleteLimit = rmConf.getDatanodeDeleteLimit();
ecUnderReplicationHandler = new ECUnderReplicationHandler(
@@ -484,17 +484,20 @@ public class ReplicationManager implements SCMService {
public void sendThrottledDeleteCommand(final ContainerInfo container,
int replicaIndex, final DatanodeDetails datanode, boolean force)
throws NotLeaderException, CommandTargetOverloadedException {
- List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount =
- getAvailableDatanodes(Collections.singletonList(datanode),
- Type.deleteContainerCommand, datanodeDeleteLimit);
- if (datanodeWithCommandCount.isEmpty()) {
- throw new CommandTargetOverloadedException("Cannot schedule a delete " +
- "container command for container " + container.containerID() +
- " on datanode " + datanode + " as it has too many pending delete " +
- "commands");
+ try {
+ int commandCount = nodeManager.getTotalDatanodeCommandCount(datanode,
+ Type.deleteContainerCommand);
+ if (commandCount >= datanodeDeleteLimit) {
+ throw new CommandTargetOverloadedException("Cannot schedule a delete "
+
+ "container command for container " + container.containerID() +
+ " on datanode " + datanode + " as it has too many pending delete "
+
+ "commands (" + commandCount + ")");
+ }
+ sendDeleteCommand(container, replicaIndex, datanode, force);
+ } catch (NodeNotFoundException e) {
+ throw new IllegalArgumentException("Datanode " + datanode + " not " +
+ "found in NodeManager. Should not happen");
}
- sendDeleteCommand(container, replicaIndex, datanodeWithCommandCount.get(0)
- .getRight(), force);
}
/**
@@ -516,8 +519,8 @@ public class ReplicationManager implements SCMService {
List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
throws CommandTargetOverloadedException, NotLeaderException {
long containerID = containerInfo.getContainerID();
- List<Pair<Integer, DatanodeDetails>> sourceWithCmds =
getAvailableDatanodes(
- sources, Type.replicateContainerCommand, datanodeReplicationLimit);
+ List<Pair<Integer, DatanodeDetails>> sourceWithCmds =
+ getAvailableDatanodesForReplication(sources);
if (sourceWithCmds.isEmpty()) {
throw new CommandTargetOverloadedException("No sources with capacity " +
"available for replication of container " + containerID + " to " +
@@ -532,33 +535,53 @@ public class ReplicationManager implements SCMService {
sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
}
+ public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
+ ReconstructECContainersCommand command)
+ throws CommandTargetOverloadedException, NotLeaderException {
+ List<DatanodeDetails> targets = command.getTargetDatanodes();
+ List<Pair<Integer, DatanodeDetails>> targetWithCmds =
+ getAvailableDatanodesForReplication(targets);
+ if (targetWithCmds.isEmpty()) {
+ throw new CommandTargetOverloadedException("No target with capacity " +
+ "available for reconstruction of " + containerInfo.getContainerID());
+ }
+ // Put the least loaded target first
+ targetWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
+ sendDatanodeCommand(command, containerInfo,
+ targetWithCmds.get(0).getRight());
+ }
+
/**
- * For the given datanodes and command type, lookup the current queue command
- * count and return a list of datanodes with the current command count. If
- * any datanode is at or beyond the limit, then it will not be included in
the
+ * For the given datanodes, lookup the current queued command count for
+ * replication and reconstruction and return a list of datanodes with the
+ * total queued count which are less than the limit.
+ * Any datanode is at or beyond the limit, then it will not be included in
the
* returned list.
* @param datanodes List of datanodes to check for available capacity
- * @param commandType The Type of datanode command to check the capacity for.
- * @param limit The limit of commands of that type.
* @return List of datanodes with the current command count that are not over
* the limit.
*/
- private List<Pair<Integer, DatanodeDetails>> getAvailableDatanodes(
- List<DatanodeDetails> datanodes,
- StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type commandType,
- int limit) {
+ private List<Pair<Integer, DatanodeDetails>>
+ getAvailableDatanodesForReplication(List<DatanodeDetails> datanodes) {
List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount
= new ArrayList<>();
for (DatanodeDetails dn : datanodes) {
try {
- int commandCount = nodeManager.getTotalDatanodeCommandCount(dn,
- commandType);
- if (commandCount >= limit) {
+ Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
+ dn, Type.replicateContainerCommand,
+ Type.reconstructECContainersCommand);
+ int replicateCount = counts.get(Type.replicateContainerCommand);
+ int reconstructCount = counts.get(Type.reconstructECContainersCommand);
+ int totalCount = replicateCount
+ + reconstructCount * reconstructionCommandWeight;
+ if (totalCount >= datanodeReplicationLimit) {
LOG.debug("Datanode {} has reached the maximum number of queued " +
- "{} commands ({})", dn, commandType, limit);
+ "commands, replication: {}, reconstruction: {} * {})",
+ dn, replicateCount, reconstructCount,
+ reconstructionCommandWeight);
continue;
}
- datanodeWithCommandCount.add(Pair.of(commandCount, dn));
+ datanodeWithCommandCount.add(Pair.of(totalCount, dn));
} catch (NodeNotFoundException e) {
LOG.error("Node {} not found in NodeManager. Should not happen",
dn, e);
@@ -1160,9 +1183,9 @@ public class ReplicationManager implements SCMService {
defaultValue = "20",
tags = { SCM, DATANODE },
description = "A limit to restrict the total number of replication " +
- "commands queued on a datanode. Note this is intended to be a " +
- " temporary config until we have a more dynamic way of limiting " +
- "load."
+ "and reconstruction commands queued on a datanode. Note this is " +
+ "intended to be a temporary config until we have a more dynamic " +
+ "way of limiting load."
)
private int datanodeReplicationLimit = 20;
@@ -1170,6 +1193,21 @@ public class ReplicationManager implements SCMService {
return datanodeReplicationLimit;
}
+ @Config(key = "datanode.reconstruction.weight",
+ type = ConfigType.INT,
+ defaultValue = "3",
+ tags = { SCM, DATANODE },
+ description = "When counting the number of replication commands on a "
+
+ "datanode, the number of reconstruction commands is multiplied " +
+ "by this weight to ensure reconstruction commands use more of " +
+ "the capacity, as they are more expensive to process."
+ )
+ private int reconstructionCommandWeight = 3;
+
+ public int getReconstructionCommandWeight() {
+ return reconstructionCommandWeight;
+ }
+
@Config(key = "datanode.delete.container.limit",
type = ConfigType.INT,
defaultValue = "40",
@@ -1240,6 +1278,10 @@ public class ReplicationManager implements SCMService {
+ " set to " + datanodeTimeoutOffset + " and must be <"
+ " event.timeout, which is set to " + eventTimeout);
}
+ if (reconstructionCommandWeight <= 0) {
+ throw new IllegalArgumentException("reconstructionCommandWeight is"
+ + " set to " + reconstructionCommandWeight + " and must be > 0");
+ }
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 8d228b54e5..3601c3230d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -331,6 +331,25 @@ public interface NodeManager extends
StorageContainerNodeProtocol,
int getTotalDatanodeCommandCount(DatanodeDetails datanodeDetails,
SCMCommandProto.Type cmdType) throws NodeNotFoundException;
+ /**
+ * Get the total number of pending commands of the given types on the given
+ * datanode. For each command, this includes both the number of commands
+ * queued in SCM which will be sent to the datanode on the next heartbeat,
+ * and the number of commands reported by the datanode in the last heartbeat.
+ * If the datanode has not reported any information for the given command,
+ * zero is assumed.
+ * All commands are retrieved under a single read lock, so the counts are
+ * consistent.
+ * @param datanodeDetails The datanode to query.
+ * @param cmdType The list of command Types To query.
+ * @return A Map of commandType to Integer with an entry for each command
type
+ * passed.
+ * @throws NodeNotFoundException
+ */
+ Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+ DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType)
+ throws NodeNotFoundException;
+
/**
* Get list of SCMCommands in the Command Queue for a particular Datanode.
* @param dnID - Datanode uuid.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 979ccc4b66..b8e1aa9ca4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -818,6 +818,37 @@ public class SCMNodeManager implements NodeManager {
}
}
+ /**
+ * Get the total number of pending commands of the given types on the given
+ * datanode. For each command, this includes both the number of commands
+ * queued in SCM which will be sent to the datanode on the next heartbeat,
+ * and the number of commands reported by the datanode in the last heartbeat.
+ * If the datanode has not reported any information for the given command,
+ * zero is assumed.
+ * All commands are retrieved under a single read lock, so the counts are
+ * consistent.
+ * @param datanodeDetails The datanode to query.
+ * @param cmdType The list of command Types To query.
+ * @return A Map of commandType to Integer with an entry for each command
type
+ * passed.
+ * @throws NodeNotFoundException
+ */
+ @Override
+ public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+ DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType)
+ throws NodeNotFoundException {
+ Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+ readLock().lock();
+ try {
+ for (SCMCommandProto.Type type : cmdType) {
+ counts.put(type, getTotalDatanodeCommandCount(datanodeDetails, type));
+ }
+ return counts;
+ } finally {
+ readLock().unlock();
+ }
+ }
+
/**
* Returns the aggregated node stats.
*
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index c10575925e..27e3b1aeee 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -633,6 +633,12 @@ public class MockNodeManager implements NodeManager {
return 0;
}
+ @Override
+ public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+ DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType) {
+ return Collections.emptyMap();
+ }
+
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 69fb5ffa25..6b09562d9d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -43,6 +43,7 @@ import
org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -334,6 +335,12 @@ public class SimpleMockNodeManager implements NodeManager {
return 0;
}
+ @Override
+ public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+ DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType) {
+ return Collections.emptyMap();
+ }
+
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return null;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index fc6b9c143a..d0b1f6322a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -344,6 +345,27 @@ public final class ReplicationTestUtil {
Mockito.any(DatanodeDetails.class), anyInt());
}
+ /**
+ * Given a Mockito mock of ReplicationManager, this method will mock the
+ * SendThrottledReconstructionCommand method so that it adds the command
+ * created to the commandsSent set.
+ * @param mock Mock of ReplicationManager
+ * @param commandsSent Set to add the command to rather than sending it.
+ * @throws NotLeaderException
+ * @throws CommandTargetOverloadedException
+ */
+ public static void mockSendThrottledReconstructionCommand(
+ ReplicationManager mock,
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+ throws NotLeaderException, CommandTargetOverloadedException {
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ ReconstructECContainersCommand cmd = invocationOnMock.getArgument(1);
+ commandsSent.add(Pair.of(cmd.getTargetDatanodes().get(0), cmd));
+ return null;
+ }).when(mock).sendThrottledReconstructionCommand(
+ Mockito.any(ContainerInfo.class), Mockito.any());
+ }
+
/**
* Given a Mockito mock of ReplicationManager, this method will mock the
* sendDatanodeCommand method so that it adds the command created to the
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 867bdd15d9..6ea963ab14 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -120,6 +120,8 @@ public class TestECUnderReplicationHandler {
replicationManager, commandsSent);
ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
replicationManager, commandsSent);
+ ReplicationTestUtil.mockSendThrottledReconstructionCommand(
+ replicationManager, commandsSent);
conf = SCMTestUtils.getConf();
repConfig = new ECReplicationConfig(DATA, PARITY);
@@ -169,6 +171,21 @@ public class TestECUnderReplicationHandler {
availableReplicas, 0, 0, policy);
}
+ @Test
+ public void testThrowsWhenTargetsOverloaded() throws IOException {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
+
+ doThrow(new CommandTargetOverloadedException("Overloaded"))
+ .when(replicationManager).sendThrottledReconstructionCommand(
+ any(), any());
+
+ Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
+ testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+ availableReplicas, 0, 0, policy));
+ }
+
@Test
public void testUnderReplicationWithDecomIndex1() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index c13c3b7da8..b48532d0fe 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -859,7 +859,7 @@ public class TestReplicationManager {
}
@Test
- public void testCreateThrottledReplicateContainerCommand()
+ public void testSendThrottledReplicateContainerCommand()
throws CommandTargetOverloadedException, NodeNotFoundException,
NotLeaderException {
Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
@@ -869,11 +869,16 @@ public class TestReplicationManager {
sourceNodes.put(MockDatanodeDetails.randomDatanodeDetails(), i * 5);
}
- Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
- eq(SCMCommandProto.Type.replicateContainerCommand)))
+ Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+ eq(SCMCommandProto.Type.replicateContainerCommand),
+ eq(SCMCommandProto.Type.reconstructECContainersCommand)))
.thenAnswer(invocation -> {
+ Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
DatanodeDetails dn = invocation.getArgument(0);
- return sourceNodes.get(dn);
+ counts.put(SCMCommandProto.Type.replicateContainerCommand,
+ sourceNodes.get(dn));
+ counts.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+ return counts;
});
ContainerInfo container = ReplicationTestUtil.createContainerInfo(
@@ -892,27 +897,126 @@ public class TestReplicationManager {
}
@Test(expected = CommandTargetOverloadedException.class)
- public void testCreateThrottledReplicateContainerCommandThrowsWhenNoSources()
+ public void testSendThrottledReplicateContainerCommandThrowsWhenNoSources()
throws CommandTargetOverloadedException, NodeNotFoundException,
NotLeaderException {
+ // Reconstruction commands also count toward the limit, so set things up
+ // so that the nodes are at the limit caused by 1 reconstruction command
+ // and the remaining replication commands
int limit = replicationManager.getConfig().getDatanodeReplicationLimit();
- Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
+ int reconstructionWeight = replicationManager.getConfig()
+ .getReconstructionCommandWeight();
+ int reconstructionCount = 1;
+ int replicationCount = limit - reconstructionCount * reconstructionWeight;
+ List<DatanodeDetails> sourceNodes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
- sourceNodes.put(MockDatanodeDetails.randomDatanodeDetails(), limit + 1);
+ sourceNodes.add(MockDatanodeDetails.randomDatanodeDetails());
}
- Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
- eq(SCMCommandProto.Type.replicateContainerCommand)))
+ Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+ eq(SCMCommandProto.Type.replicateContainerCommand),
+ eq(SCMCommandProto.Type.reconstructECContainersCommand)))
.thenAnswer(invocation -> {
- DatanodeDetails dn = invocation.getArgument(0);
- return sourceNodes.get(dn);
+ Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+ counts.put(SCMCommandProto.Type.replicateContainerCommand,
+ replicationCount);
+ counts.put(SCMCommandProto.Type.reconstructECContainersCommand,
+ reconstructionCount);
+ return counts;
});
DatanodeDetails destination = MockDatanodeDetails.randomDatanodeDetails();
ContainerInfo container = ReplicationTestUtil.createContainerInfo(
repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
replicationManager.sendThrottledReplicationCommand(
- container, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+ container, sourceNodes, destination, 0);
+ }
+
+ @Test
+ public void testSendThrottledReconstructionCommand()
+ throws CommandTargetOverloadedException, NodeNotFoundException,
+ NotLeaderException {
+ Map<DatanodeDetails, Integer> targetNodes = new HashMap<>();
+ DatanodeDetails cmdTarget = MockDatanodeDetails.randomDatanodeDetails();
+ targetNodes.put(cmdTarget, 0);
+ targetNodes.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+
+ Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+ eq(SCMCommandProto.Type.replicateContainerCommand),
+ eq(SCMCommandProto.Type.reconstructECContainersCommand)))
+ .thenAnswer(invocation -> {
+ Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+ DatanodeDetails dn = invocation.getArgument(0);
+ counts.put(SCMCommandProto.Type.replicateContainerCommand,
+ targetNodes.get(dn));
+ counts.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+ return counts;
+ });
+
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+
+ ReconstructECContainersCommand command = createReconstructionCommand(
+ container, new ArrayList<>(targetNodes.keySet()));
+
+ replicationManager.sendThrottledReconstructionCommand(container, command);
+
+ Assertions.assertEquals(1, commandsSent.size());
+ Pair<UUID, SCMCommand<?>> cmd = commandsSent.iterator().next();
+ Assertions.assertEquals(cmdTarget.getUuid(), cmd.getLeft());
+ }
+
+ @Test(expected = CommandTargetOverloadedException.class)
+ public void testSendThrottledReconstructionCommandThrowsWhenNoTargets()
+ throws CommandTargetOverloadedException, NodeNotFoundException,
+ NotLeaderException {
+ int limit = replicationManager.getConfig().getDatanodeReplicationLimit();
+ int reconstructionWeight = replicationManager.getConfig()
+ .getReconstructionCommandWeight();
+
+ // We want to test that Replication commands also count toward the limit,
+ // and also that the weight is applied the the reconstruction count.
+ // Using the values below will set the targets at their limit.
+ int reconstructionCount = 2;
+ int replicationCount = limit - reconstructionCount * reconstructionWeight;
+
+ List<DatanodeDetails> targets = new ArrayList<>();
+ targets.add(MockDatanodeDetails.randomDatanodeDetails());
+ targets.add(MockDatanodeDetails.randomDatanodeDetails());
+
+ Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+ eq(SCMCommandProto.Type.replicateContainerCommand),
+ eq(SCMCommandProto.Type.reconstructECContainersCommand)))
+ .thenAnswer(invocation -> {
+ Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+ counts.put(SCMCommandProto.Type.replicateContainerCommand,
+ replicationCount);
+ counts.put(SCMCommandProto.Type.reconstructECContainersCommand,
+ reconstructionCount);
+ return counts;
+ });
+
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+ ReconstructECContainersCommand command = createReconstructionCommand(
+ container, targets);
+ replicationManager.sendThrottledReconstructionCommand(container, command);
+ }
+
+ private ReconstructECContainersCommand createReconstructionCommand(
+ ContainerInfo containerInfo, List<DatanodeDetails> targets) {
+ List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex> sources
+ = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ sources.add(
+ new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(
+ MockDatanodeDetails.randomDatanodeDetails(), i));
+ }
+ byte[] missingIndexes = new byte[]{4, 5};
+
+ return new ReconstructECContainersCommand(
+ containerInfo.getContainerID(), sources,
+ targets, missingIndexes, (ECReplicationConfig) repConfig);
}
@Test
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index f3838c00f5..6a789cb92a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -1039,6 +1039,14 @@ public class TestSCMNodeManager {
node1, SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(16, nodeManager.getTotalDatanodeCommandCount(
node1, SCMCommandProto.Type.closeContainerCommand));
+ Map<SCMCommandProto.Type, Integer> counts =
+ nodeManager.getTotalDatanodeCommandCounts(node1,
+ SCMCommandProto.Type.replicateContainerCommand,
+ SCMCommandProto.Type.closeContainerCommand);
+ Assertions.assertEquals(0,
+ counts.get(SCMCommandProto.Type.replicateContainerCommand));
+ Assertions.assertEquals(16,
+ counts.get(SCMCommandProto.Type.closeContainerCommand));
}
@Test
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index f3e082cf43..5ce8153695 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -47,6 +47,7 @@ import
org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -482,6 +483,12 @@ public class ReplicationNodeManagerMock implements
NodeManager {
return 0;
}
+ @Override
+ public Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
+ DatanodeDetails datanodeDetails, SCMCommandProto.Type... cmdType) {
+ return Collections.emptyMap();
+ }
+
@Override
public void onMessage(CommandForDatanode commandForDatanode,
EventPublisher publisher) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]