This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6dd80eba2c HDDS-8233. ReplicationManager: Throttle delete container
commands from over-replication handlers (#4447)
6dd80eba2c is described below
commit 6dd80eba2c9f448d8985701af9647811aa920205
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Mar 22 15:31:16 2023 +0000
HDDS-8233. ReplicationManager: Throttle delete container commands from
over-replication handlers (#4447)
---
....java => CommandTargetOverloadedException.java} | 4 +-
.../replication/ECOverReplicationHandler.java | 27 ++++-
.../replication/ECUnderReplicationHandler.java | 2 +-
.../replication/MisReplicationHandler.java | 2 +-
.../replication/RatisOverReplicationHandler.java | 52 +++++++--
.../replication/RatisUnderReplicationHandler.java | 2 +-
.../container/replication/ReplicationManager.java | 123 +++++++++++++++------
.../container/replication/ReplicationTestUtil.java | 29 ++++-
.../replication/TestECMisReplicationHandler.java | 6 +-
.../replication/TestECOverReplicationHandler.java | 80 ++++++++++++--
.../replication/TestECUnderReplicationHandler.java | 6 +-
.../replication/TestMisReplicationHandler.java | 2 +-
.../TestRatisMisReplicationHandler.java | 6 +-
.../TestRatisOverReplicationHandler.java | 88 ++++++++++++++-
.../TestRatisUnderReplicationHandler.java | 2 +-
.../replication/TestReplicationManager.java | 57 ++++++++--
16 files changed, 401 insertions(+), 87 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AllSourcesOverloadedException.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/CommandTargetOverloadedException.java
similarity index 88%
rename from
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AllSourcesOverloadedException.java
rename to
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/CommandTargetOverloadedException.java
index 2b0b490c51..b83db03024 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AllSourcesOverloadedException.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/CommandTargetOverloadedException.java
@@ -22,9 +22,9 @@ import java.io.IOException;
/**
* Exception class used to indicate that all sources are overloaded.
*/
-public class AllSourcesOverloadedException extends IOException {
+public class CommandTargetOverloadedException extends IOException {
- public AllSourcesOverloadedException(String message) {
+ public CommandTargetOverloadedException(String message) {
super(message);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 4db51d5263..a7350dfc54 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -68,7 +68,7 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
public int processAndSendCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy)
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
ContainerInfo container = result.getContainerInfo();
// We are going to check for over replication, so we should filter out any
@@ -154,6 +154,7 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
replicaIndexCounts.put(r.getReplicaIndex(),
replicaIndexCounts.getOrDefault(r.getReplicaIndex(), 0) + 1);
}
+ CommandTargetOverloadedException firstException = null;
for (ContainerReplica r : replicasToRemove) {
int currentCount = replicaIndexCounts.getOrDefault(
r.getReplicaIndex(), 0);
@@ -162,16 +163,32 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
"for that index to zero. Candidate Replicas: {}", r, candidates);
continue;
}
- replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
- replicationManager.sendDeleteCommand(container, r.getReplicaIndex(),
- r.getDatanodeDetails(), true);
- commandsSent++;
+ try {
+ replicationManager.sendThrottledDeleteCommand(container,
+ r.getReplicaIndex(), r.getDatanodeDetails(), true);
+ replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
+ commandsSent++;
+ } catch (CommandTargetOverloadedException e) {
+ LOG.debug("Unable to send delete command for container {} replica " +
+ "index {} to {}",
+ container.getContainerID(), r.getReplicaIndex(),
+ r.getDatanodeDetails());
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
}
if (commandsSent == 0) {
LOG.warn("With the current state of available replicas {}, no" +
" commands were created to remove excess replicas.", replicas);
}
+ // If any of the "to remove" replicas were not able to be removed due to
+ // load on the datanodes, then throw the first exception we encountered.
+ // This will allow the container to be re-queued and tried again later.
+ if (firstException != null) {
+ throw firstException;
+ }
return commandsSent;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 872835e444..dcdfb8d0a5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -446,7 +446,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
private void createReplicateCommand(
ContainerInfo container, Iterator<DatanodeDetails> iterator,
ContainerReplica replica, ECContainerReplicaCount replicaCount)
- throws AllSourcesOverloadedException, NotLeaderException {
+ throws CommandTargetOverloadedException, NotLeaderException {
final boolean push = replicationManager.getConfig().isPush();
DatanodeDetails source = replica.getDatanodeDetails();
DatanodeDetails target = iterator.next();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 3d013999b0..dd2dac5cf7 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -122,7 +122,7 @@ public abstract class MisReplicationHandler implements
ContainerInfo containerInfo,
Set<ContainerReplica> replicasToBeReplicated,
List<DatanodeDetails> targetDns)
- throws AllSourcesOverloadedException, NotLeaderException {
+ throws CommandTargetOverloadedException, NotLeaderException {
int commandsSent = 0;
int datanodeIdx = 0;
for (ContainerReplica replica : replicasToBeReplicated) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
index f1796f503e..55108ff6b8 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
@@ -248,7 +248,7 @@ public class RatisOverReplicationHandler
private int createCommands(
ContainerInfo containerInfo, List<ContainerReplica> replicas,
- int excess) throws NotLeaderException {
+ int excess) throws NotLeaderException, CommandTargetOverloadedException {
/*
Being in the over replication queue means we have enough replicas that
@@ -256,16 +256,31 @@ public class RatisOverReplicationHandler
deleted. This might make the container violate placement policy.
*/
int commandsSent = 0;
+ int initialExcess = excess;
+ CommandTargetOverloadedException firstOverloadedException = null;
List<ContainerReplica> replicasRemoved = new ArrayList<>();
for (ContainerReplica replica : replicas) {
if (excess == 0) {
- return commandsSent;
+ break;
}
if (!ReplicationManager.compareState(
containerInfo.getState(), replica.getState())) {
- replicationManager.sendDeleteCommand(containerInfo,
- replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
- commandsSent++;
+ // Delete commands are throttled, so they may fail to send. However,
the
+ // replicas here are not in the same state as the container, so they
+ // must be deleted in preference to "healthy" replicas later.
Therefore,
+ // if they fail to delete, we continue to mark them as deleted by
+ // reducing the excess so healthy container are not removed later in
+ // this method.
+ try {
+ replicationManager.sendThrottledDeleteCommand(containerInfo,
+ replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
+ commandsSent++;
+ } catch (CommandTargetOverloadedException e) {
+ LOG.debug("Unable to send delete command for a mis-matched state " +
+ "container {} to {} as it has too many pending delete commands",
+ containerInfo.containerID(), replica.getDatanodeDetails());
+ firstOverloadedException = e;
+ }
replicasRemoved.add(replica);
excess--;
}
@@ -281,17 +296,34 @@ public class RatisOverReplicationHandler
// iterate through replicas in deterministic order
for (ContainerReplica replica : replicas) {
if (excess == 0) {
- return commandsSent;
+ break;
}
if (super.isPlacementStatusActuallyEqualAfterRemove(replicaSet, replica,
containerInfo.getReplicationFactor().getNumber())) {
- replicationManager.sendDeleteCommand(containerInfo,
- replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
- commandsSent++;
- excess--;
+ try {
+ replicationManager.sendThrottledDeleteCommand(containerInfo,
+ replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
+ commandsSent++;
+ excess--;
+ } catch (CommandTargetOverloadedException e) {
+ LOG.debug("Unable to send delete command for container {} to {} as "
+
+ "it has too many pending delete commands",
+ containerInfo.containerID(), replica.getDatanodeDetails());
+ if (firstOverloadedException == null) {
+ firstOverloadedException = e;
+ }
+ }
}
}
+ // If we encountered an overloaded exception, and then did not send as many
+ // delete commands as the original excess number, then it means there must
+ // be some replicas we did not delete when we should have. In this case,
+ // throw the exception so that container is requeued and processed again
+ // later.
+ if (firstOverloadedException != null && commandsSent != initialExcess) {
+ throw firstOverloadedException;
+ }
return commandsSent;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 06485d4e71..04b6564579 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -241,7 +241,7 @@ public class RatisUnderReplicationHandler
private int sendReplicationCommands(
ContainerInfo containerInfo, List<DatanodeDetails> sources,
- List<DatanodeDetails> targets) throws AllSourcesOverloadedException,
+ List<DatanodeDetails> targets) throws CommandTargetOverloadedException,
NotLeaderException {
final boolean push = replicationManager.getConfig().isPush();
int commandsSent = 0;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index d72dca28bc..4c321bc2d9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
@@ -74,6 +75,7 @@ import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -177,6 +179,7 @@ public class ReplicationManager implements SCMService {
private final OverReplicatedProcessor overReplicatedProcessor;
private final HealthCheck containerCheckChain;
private final int datanodeReplicationLimit;
+ private final int datanodeDeleteLimit;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -228,6 +231,7 @@ public class ReplicationManager implements SCMService {
this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();
this.datanodeReplicationLimit = rmConf.getDatanodeReplicationLimit();
+ this.datanodeDeleteLimit = rmConf.getDatanodeDeleteLimit();
ecUnderReplicationHandler = new ECUnderReplicationHandler(
ecContainerPlacement, conf, this);
@@ -468,42 +472,58 @@ public class ReplicationManager implements SCMService {
scmDeadlineEpochMs, datanodeDeadlineEpochMs);
}
+ /**
+ * Sends delete container command for the given container to the given
+ * datanode, provided that the datanode is not overloaded with delete
+ * container commands. If the datanode is overloaded, an exception will be
+ * thrown.
+ * @param container Container to be deleted
+ * @param replicaIndex Index of the container replica to be deleted
+ * @param datanode The datanode on which the replica should be deleted
+ * @param force true to force delete a container that is open or not empty
+ * @throws NotLeaderException when this SCM is not the leader
+ * @throws CommandTargetOverloadedException If the target datanode is has too
+ * many pending commands.
+ */
+ public void sendThrottledDeleteCommand(final ContainerInfo container,
+ int replicaIndex, final DatanodeDetails datanode, boolean force)
+ throws NotLeaderException, CommandTargetOverloadedException {
+ List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount =
+ getAvailableDatanodes(Collections.singletonList(datanode),
+ Type.deleteContainerCommand, datanodeDeleteLimit);
+ if (datanodeWithCommandCount.isEmpty()) {
+ throw new CommandTargetOverloadedException("Cannot schedule a delete " +
+ "container command for container " + container.containerID() +
+ " on datanode " + datanode + " as it has too many pending delete " +
+ "commands");
+ }
+ sendDeleteCommand(container, replicaIndex, datanodeWithCommandCount.get(0)
+ .getRight(), force);
+ }
+
/**
* Create a ReplicateContainerCommand for the given container and to push the
* container to the target datanode. The list of sources are checked to
ensure
* the datanode has sufficient capacity to accept the container command, and
* then the command is sent to the datanode with the fewest pending commands.
- * If all sources are overloaded, an AllSourcesOverloadedException is thrown.
- * @param containerID The containerID to be replicated
+ * If all sources are overloaded, a CommandTargetOverloadedException is
+ * thrown.
+ * @param containerInfo The container to be replicated
* @param sources The list of datanodes that can be used as sources
* @param target The target datanode where the container should be replicated
* @param replicaIndex The index of the container replica to be replicated
* @return A pair containing the datanode that the command was sent to, and
* the command created.
- * @throws AllSourcesOverloadedException
+ * @throws CommandTargetOverloadedException
*/
- public Pair<DatanodeDetails, SCMCommand<?>>
- createThrottledReplicationCommand(long containerID,
+ public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
- throws AllSourcesOverloadedException {
- List<Pair<Integer, DatanodeDetails>> sourceWithCmds = new ArrayList<>();
- for (DatanodeDetails source : sources) {
- try {
- int commandCount = nodeManager.getTotalDatanodeCommandCount(source,
- Type.replicateContainerCommand);
- if (commandCount >= datanodeReplicationLimit) {
- LOG.debug("Source {} has reached the maximum number of queued " +
- "replication commands ({})", source, datanodeReplicationLimit);
- continue;
- }
- sourceWithCmds.add(Pair.of(commandCount, source));
- } catch (NodeNotFoundException e) {
- LOG.error("Node {} not found in NodeManager. Should not happen",
- source, e);
- }
- }
+ throws CommandTargetOverloadedException, NotLeaderException {
+ long containerID = containerInfo.getContainerID();
+ List<Pair<Integer, DatanodeDetails>> sourceWithCmds =
getAvailableDatanodes(
+ sources, Type.replicateContainerCommand, datanodeReplicationLimit);
if (sourceWithCmds.isEmpty()) {
- throw new AllSourcesOverloadedException("No sources with capacity " +
+ throw new CommandTargetOverloadedException("No sources with capacity " +
"available for replication of container " + containerID + " to " +
target);
}
@@ -513,16 +533,42 @@ public class ReplicationManager implements SCMService {
ReplicateContainerCommand cmd =
ReplicateContainerCommand.toTarget(containerID, target);
cmd.setReplicaIndex(replicaIndex);
- return Pair.of(sourceWithCmds.get(0).getRight(), cmd);
+ sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
}
- public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
- List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
- throws AllSourcesOverloadedException, NotLeaderException {
- Pair<DatanodeDetails, SCMCommand<?>> cmdPair =
- createThrottledReplicationCommand(containerInfo.getContainerID(),
- sources, target, replicaIndex);
- sendDatanodeCommand(cmdPair.getRight(), containerInfo, cmdPair.getLeft());
+ /**
+ * For the given datanodes and command type, lookup the current queue command
+ * count and return a list of datanodes with the current command count. If
+ * any datanode is at or beyond the limit, then it will not be included in
the
+ * returned list.
+ * @param datanodes List of datanodes to check for available capacity
+ * @param commandType The Type of datanode command to check the capacity for.
+ * @param limit The limit of commands of that type.
+ * @return List of datanodes with the current command count that are not over
+ * the limit.
+ */
+ private List<Pair<Integer, DatanodeDetails>> getAvailableDatanodes(
+ List<DatanodeDetails> datanodes,
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type commandType,
+ int limit) {
+ List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount
+ = new ArrayList<>();
+ for (DatanodeDetails dn : datanodes) {
+ try {
+ int commandCount = nodeManager.getTotalDatanodeCommandCount(dn,
+ commandType);
+ if (commandCount >= limit) {
+ LOG.debug("Datanode {} has reached the maximum number of queued " +
+ "{} commands ({})", dn, commandType, limit);
+ continue;
+ }
+ datanodeWithCommandCount.add(Pair.of(commandCount, dn));
+ } catch (NodeNotFoundException e) {
+ LOG.error("Node {} not found in NodeManager. Should not happen",
+ dn, e);
+ }
+ }
+ return datanodeWithCommandCount;
}
/**
@@ -1147,6 +1193,21 @@ public class ReplicationManager implements SCMService {
return datanodeReplicationLimit;
}
+ @Config(key = "datanode.delete.container.limit",
+ type = ConfigType.INT,
+ defaultValue = "40",
+ tags = { SCM, DATANODE },
+ description = "A limit to restrict the total number of delete " +
+ "container commands queued on a datanode. Note this is intended " +
+ "to be a temporary config until we have a more dynamic way of " +
+ "limiting load"
+ )
+ private int datanodeDeleteLimit = 40;
+
+ public int getDatanodeDeleteLimit() {
+ return datanodeDeleteLimit;
+ }
+
public void setDatanodeReplicationLimit(int limit) {
this.datanodeReplicationLimit = limit;
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 3345340291..fc6b9c143a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -325,11 +325,11 @@ public final class ReplicationTestUtil {
* @param mock Mock of ReplicationManager
* @param commandsSent Set to add the command to rather than sending it.
* @throws NotLeaderException
- * @throws AllSourcesOverloadedException
+ * @throws CommandTargetOverloadedException
*/
public static void mockRMSendThrottleReplicateCommand(ReplicationManager
mock,
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
- throws NotLeaderException, AllSourcesOverloadedException {
+ throws NotLeaderException, CommandTargetOverloadedException {
doAnswer((Answer<Void>) invocationOnMock -> {
List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
ContainerInfo containerInfo = invocationOnMock.getArgument(0);
@@ -386,4 +386,29 @@ public final class ReplicationTestUtil {
return null;
}).when(mock).sendDeleteCommand(any(), anyInt(), any(), anyBoolean());
}
+
+ /**
+ * Given a Mockito mock of ReplicationManager, this method will mock the
+ * sendThrottledDeleteCommand method so that it adds the command created to
+ * the commandsSent set.
+ * @param mock Mock of ReplicationManager
+ * @param commandsSent Set to add the command to rather than sending it.
+ * @throws NotLeaderException
+ */
+ public static void mockRMSendThrottledDeleteCommand(ReplicationManager mock,
+ Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
+ throws NotLeaderException, CommandTargetOverloadedException {
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+ int replicaIndex = invocationOnMock.getArgument(1);
+ DatanodeDetails target = invocationOnMock.getArgument(2);
+ boolean forceDelete = invocationOnMock.getArgument(3);
+ DeleteContainerCommand deleteCommand = new DeleteContainerCommand(
+ containerInfo.getContainerID(), forceDelete);
+ deleteCommand.setReplicaIndex(replicaIndex);
+ commandsSent.add(Pair.of(target, deleteCommand));
+ return null;
+ }).when(mock)
+ .sendThrottledDeleteCommand(any(), anyInt(), any(), anyBoolean());
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
index 4c27fce353..a660efa8b1 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
@@ -56,7 +56,7 @@ public class TestECMisReplicationHandler extends
TestMisReplicationHandler {
@BeforeEach
public void setup() throws NodeNotFoundException,
- AllSourcesOverloadedException, NotLeaderException {
+ CommandTargetOverloadedException, NotLeaderException {
ECReplicationConfig repConfig = new ECReplicationConfig(DATA, PARITY);
setup(repConfig);
}
@@ -172,14 +172,14 @@ public class TestECMisReplicationHandler extends
TestMisReplicationHandler {
@Test
public void testAllSourcesOverloaded() throws IOException {
ReplicationManager replicationManager = getReplicationManager();
- Mockito.doThrow(new AllSourcesOverloadedException("Overloaded"))
+ Mockito.doThrow(new CommandTargetOverloadedException("Overloaded"))
.when(replicationManager).sendThrottledReplicationCommand(any(),
anyList(), any(), anyInt());
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
- assertThrows(AllSourcesOverloadedException.class,
+ assertThrows(CommandTargetOverloadedException.class,
() -> testMisReplication(availableReplicas, Collections.emptyList(),
0, 1, 1));
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index 01d1c3a0d2..b69a1fbc43 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -40,10 +40,13 @@ import
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -51,6 +54,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -60,6 +64,9 @@ import static
org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
/**
* Tests the ECOverReplicationHandling functionality.
@@ -75,7 +82,8 @@ public class TestECOverReplicationHandler {
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
@BeforeEach
- public void setup() throws NodeNotFoundException, NotLeaderException {
+ public void setup() throws NodeNotFoundException, NotLeaderException,
+ CommandTargetOverloadedException {
staleNode = null;
replicationManager = Mockito.mock(ReplicationManager.class);
@@ -91,7 +99,7 @@ public class TestECOverReplicationHandler {
});
commandsSent = new HashSet<>();
- ReplicationTestUtil.mockRMSendDeleteCommand(replicationManager,
+ ReplicationTestUtil.mockRMSendThrottledDeleteCommand(replicationManager,
commandsSent);
nodeManager = new MockNodeManager(true, 10);
@@ -108,7 +116,7 @@ public class TestECOverReplicationHandler {
@Test
public void testNoOverReplication()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -119,7 +127,7 @@ public class TestECOverReplicationHandler {
@Test
public void testOverReplicationFixedByPendingDelete()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -137,7 +145,7 @@ public class TestECOverReplicationHandler {
@Test
public void testOverReplicationWithDecommissionIndexes()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -149,7 +157,7 @@ public class TestECOverReplicationHandler {
@Test
public void testOverReplicationWithStaleIndexes()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -167,7 +175,7 @@ public class TestECOverReplicationHandler {
@Test
public void testOverReplicationWithOpenReplica()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
@@ -187,7 +195,7 @@ public class TestECOverReplicationHandler {
*/
@Test
public void testOverReplicationButPolicyReturnsWrongIndexes()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5),
@@ -205,7 +213,7 @@ public class TestECOverReplicationHandler {
@Test
public void testOverReplicationWithOneSameIndexes()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -220,7 +228,7 @@ public class TestECOverReplicationHandler {
@Test
public void testOverReplicationWithMultiSameIndexes()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -244,7 +252,7 @@ public class TestECOverReplicationHandler {
*/
@Test
public void testOverReplicationWithUnderReplication()
- throws NotLeaderException {
+ throws NotLeaderException, CommandTargetOverloadedException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(
Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
@@ -267,10 +275,58 @@ public class TestECOverReplicationHandler {
Assert.assertEquals(1, ((DeleteContainerCommand)cmd).getReplicaIndex());
}
+ @Test
+ public void testDeleteThrottling() throws IOException {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5));
+
+ ContainerHealthResult.UnderReplicatedHealthResult health =
+ new ContainerHealthResult.UnderReplicatedHealthResult(
+ container, 2, false, false, false);
+
+ // On the first call to throttled delete, throw an overloaded exception.
+ final AtomicBoolean shouldThrow = new AtomicBoolean(true);
+ // On the first call we throw, on subsequent calls we succeed.
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ if (shouldThrow.get()) {
+ shouldThrow.set(false);
+ throw new CommandTargetOverloadedException("Test exception");
+ }
+ ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+ int replicaIndex = invocationOnMock.getArgument(1);
+ DatanodeDetails target = invocationOnMock.getArgument(2);
+ boolean forceDelete = invocationOnMock.getArgument(3);
+ DeleteContainerCommand deleteCommand = new DeleteContainerCommand(
+ containerInfo.getContainerID(), forceDelete);
+ deleteCommand.setReplicaIndex(replicaIndex);
+ commandsSent.add(Pair.of(target, deleteCommand));
+ return null;
+ }).when(replicationManager)
+ .sendThrottledDeleteCommand(any(), anyInt(), any(), anyBoolean());
+
+ ECOverReplicationHandler ecORH =
+ new ECOverReplicationHandler(policy, replicationManager);
+
+ try {
+ ecORH.processAndSendCommands(availableReplicas, ImmutableList.of(),
+ health, 1);
+ Assertions.fail("Expected CommandTargetOverloadedException");
+ } catch (CommandTargetOverloadedException e) {
+ // This is expected.
+ }
+ Assert.assertEquals(1, commandsSent.size());
+ }
+
private void testOverReplicationWithIndexes(
Set<ContainerReplica> availableReplicas,
Map<Integer, Integer> index2excessNum,
- List<ContainerReplicaOp> pendingOps) throws NotLeaderException {
+ List<ContainerReplicaOp> pendingOps) throws NotLeaderException,
+ CommandTargetOverloadedException {
ECOverReplicationHandler ecORH =
new ECOverReplicationHandler(policy, replicationManager);
ContainerHealthResult.OverReplicatedHealthResult result =
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 2605469f21..867bdd15d9 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -94,7 +94,7 @@ public class TestECUnderReplicationHandler {
@BeforeEach
public void setup() throws NodeNotFoundException,
- AllSourcesOverloadedException, NotLeaderException {
+ CommandTargetOverloadedException, NotLeaderException {
nodeManager = new MockNodeManager(true, 10) {
@Override
public NodeStatus getNodeStatus(DatanodeDetails dd) {
@@ -237,11 +237,11 @@ public class TestECUnderReplicationHandler {
.createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
- doThrow(new AllSourcesOverloadedException("Overloaded"))
+ doThrow(new CommandTargetOverloadedException("Overloaded"))
.when(replicationManager).sendThrottledReplicationCommand(
any(), anyList(), any(), anyInt());
- Assertions.assertThrows(AllSourcesOverloadedException.class, () ->
+ Assertions.assertThrows(CommandTargetOverloadedException.class, () ->
testUnderReplicationWithMissingIndexes(
Lists.emptyList(), availableReplicas, 1, 0, policy));
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
index 0af6e7008e..9350b72623 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java
@@ -70,7 +70,7 @@ public abstract class TestMisReplicationHandler {
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
protected void setup(ReplicationConfig repConfig)
- throws NodeNotFoundException, AllSourcesOverloadedException,
+ throws NodeNotFoundException, CommandTargetOverloadedException,
NotLeaderException {
replicationManager = Mockito.mock(ReplicationManager.class);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
index 2e1bfadb42..d7a857acee 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
@@ -55,7 +55,7 @@ public class TestRatisMisReplicationHandler extends
TestMisReplicationHandler {
@BeforeEach
public void setup() throws NodeNotFoundException,
- AllSourcesOverloadedException, NotLeaderException {
+ CommandTargetOverloadedException, NotLeaderException {
RatisReplicationConfig repConfig = RatisReplicationConfig
.getInstance(ReplicationFactor.THREE);
setup(repConfig);
@@ -177,14 +177,14 @@ public class TestRatisMisReplicationHandler extends
TestMisReplicationHandler {
@Test
public void testAllSourcesOverloaded() throws IOException {
ReplicationManager replicationManager = getReplicationManager();
- Mockito.doThrow(new AllSourcesOverloadedException("Overloaded"))
+ Mockito.doThrow(new CommandTargetOverloadedException("Overloaded"))
.when(replicationManager).sendThrottledReplicationCommand(any(),
anyList(), any(), anyInt());
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
Pair.of(IN_SERVICE, 0));
- assertThrows(AllSourcesOverloadedException.class,
+ assertThrows(CommandTargetOverloadedException.class,
() -> testMisReplication(availableReplicas, Collections.emptyList(),
0, 1, 1));
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
index 0c8ea277af..c83c2a30d9 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -38,6 +39,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import org.slf4j.event.Level;
import java.io.IOException;
@@ -45,13 +47,20 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicasWithSameOrigin;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
/**
* Tests for {@link RatisOverReplicationHandler}.
@@ -65,7 +74,8 @@ public class TestRatisOverReplicationHandler {
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
@Before
- public void setup() throws NodeNotFoundException, NotLeaderException {
+ public void setup() throws NodeNotFoundException, NotLeaderException,
+ CommandTargetOverloadedException {
container = createContainer(HddsProtos.LifeCycleState.CLOSED,
RATIS_REPLICATION_CONFIG);
@@ -83,7 +93,7 @@ public class TestRatisOverReplicationHandler {
});
commandsSent = new HashSet<>();
- ReplicationTestUtil.mockRMSendDeleteCommand(replicationManager,
+ ReplicationTestUtil.mockRMSendThrottledDeleteCommand(replicationManager,
commandsSent);
GenericTestUtils.setLogLevel(RatisOverReplicationHandler.LOG, Level.DEBUG);
@@ -269,6 +279,80 @@ public class TestRatisOverReplicationHandler {
testProcessing(replicas, pendingOps, getOverReplicatedHealthResult(), 0);
}
+ @Test
+ public void testDeleteThrottlingMisMatchedReplica() throws IOException {
+ Set<ContainerReplica> closedReplicas = createReplicas(
+ container.containerID(), ContainerReplicaProto.State.CLOSED,
+ 0, 0, 0, 0);
+
+ ContainerReplica quasiClosedReplica = createContainerReplica(
+ container.containerID(), 0,
+ HddsProtos.NodeOperationalState.IN_SERVICE,
+ ContainerReplicaProto.State.QUASI_CLOSED);
+
+ // When processing the quasi closed replica, simulate an overloaded
+ // exception so that it does not get deleted. Then we can ensure that only
+ // one of the CLOSED replicas is removed.
+ doThrow(CommandTargetOverloadedException.class)
+ .when(replicationManager)
+ .sendThrottledDeleteCommand(Mockito.any(ContainerInfo.class),
+ anyInt(),
+ eq(quasiClosedReplica.getDatanodeDetails()),
+ anyBoolean());
+
+ Set<ContainerReplica> replicas = new HashSet<>();
+ replicas.add(quasiClosedReplica);
+ replicas.addAll(closedReplicas);
+
+ RatisOverReplicationHandler handler =
+ new RatisOverReplicationHandler(policy, replicationManager);
+
+ try {
+ handler.processAndSendCommands(replicas, Collections.emptyList(),
+ getOverReplicatedHealthResult(), 2);
+ fail("Expected CommandTargetOverloadedException");
+ } catch (CommandTargetOverloadedException e) {
+ // Expected
+ }
+ Assert.assertEquals(1, commandsSent.size());
+ Pair<DatanodeDetails, SCMCommand<?>> cmd = commandsSent.iterator().next();
+ Assert.assertNotEquals(quasiClosedReplica.getDatanodeDetails(),
+ cmd.getKey());
+ }
+
+ @Test
+ public void testDeleteThrottling() throws IOException {
+ Set<ContainerReplica> closedReplicas = createReplicas(
+ container.containerID(), ContainerReplicaProto.State.CLOSED,
+ 0, 0, 0, 0, 0);
+
+ final AtomicBoolean shouldThrow = new AtomicBoolean(true);
+ // On the first call we throw, on subsequent calls we succeed.
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ if (shouldThrow.get()) {
+ shouldThrow.set(false);
+ throw new CommandTargetOverloadedException("Test exception");
+ }
+ ContainerInfo containerInfo = invocationOnMock.getArgument(0);
+ int replicaIndex = invocationOnMock.getArgument(1);
+ DatanodeDetails target = invocationOnMock.getArgument(2);
+ boolean forceDelete = invocationOnMock.getArgument(3);
+ DeleteContainerCommand deleteCommand = new DeleteContainerCommand(
+ containerInfo.getContainerID(), forceDelete);
+ deleteCommand.setReplicaIndex(replicaIndex);
+ commandsSent.add(Pair.of(target, deleteCommand));
+ return null;
+ }).when(replicationManager)
+ .sendThrottledDeleteCommand(any(), anyInt(), any(), anyBoolean());
+
+ RatisOverReplicationHandler handler =
+ new RatisOverReplicationHandler(policy, replicationManager);
+
+ handler.processAndSendCommands(closedReplicas, Collections.emptyList(),
+ getOverReplicatedHealthResult(), 2);
+ Assert.assertEquals(2, commandsSent.size());
+ }
+
/**
* Tests whether the specified expectNumCommands number of commands are
* created by the handler.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index 5c37f5c106..f545809107 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -69,7 +69,7 @@ public class TestRatisUnderReplicationHandler {
@Before
public void setup() throws NodeNotFoundException,
- AllSourcesOverloadedException, NotLeaderException {
+ CommandTargetOverloadedException, NotLeaderException {
container = ReplicationTestUtil.createContainer(
HddsProtos.LifeCycleState.CLOSED, RATIS_REPLICATION_CONFIG);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index a6d5521db2..62737f15a6 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -858,7 +858,8 @@ public class TestReplicationManager {
@Test
public void testCreateThrottledReplicateContainerCommand()
- throws AllSourcesOverloadedException, NodeNotFoundException {
+ throws CommandTargetOverloadedException, NodeNotFoundException,
+ NotLeaderException {
Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
DatanodeDetails cmdTarget = MockDatanodeDetails.randomDatanodeDetails();
sourceNodes.put(cmdTarget, 0);
@@ -873,20 +874,25 @@ public class TestReplicationManager {
return sourceNodes.get(dn);
});
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
DatanodeDetails destination = MockDatanodeDetails.randomDatanodeDetails();
- Pair<DatanodeDetails, SCMCommand<?>> cmd = replicationManager
- .createThrottledReplicationCommand(
- 1L, new ArrayList<>(sourceNodes.keySet()), destination, 0);
- Assertions.assertEquals(cmdTarget, cmd.getLeft());
+ replicationManager.sendThrottledReplicationCommand(
+ container, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+
+ Assertions.assertEquals(1, commandsSent.size());
+ Pair<UUID, SCMCommand<?>> cmd = commandsSent.iterator().next();
+ Assertions.assertEquals(cmdTarget.getUuid(), cmd.getLeft());
Assertions.assertEquals(destination,
((ReplicateContainerCommand) cmd.getRight()).getTargetDatanode());
Assertions.assertEquals(0,
((ReplicateContainerCommand) cmd.getRight()).getReplicaIndex());
}
- @Test(expected = AllSourcesOverloadedException.class)
+ @Test(expected = CommandTargetOverloadedException.class)
public void testCreateThrottledReplicateContainerCommandThrowsWhenNoSources()
- throws AllSourcesOverloadedException, NodeNotFoundException {
+ throws CommandTargetOverloadedException, NodeNotFoundException,
+ NotLeaderException {
int limit = replicationManager.getConfig().getDatanodeReplicationLimit();
Map<DatanodeDetails, Integer> sourceNodes = new HashMap<>();
for (int i = 0; i < 3; i++) {
@@ -901,8 +907,41 @@ public class TestReplicationManager {
});
DatanodeDetails destination = MockDatanodeDetails.randomDatanodeDetails();
- replicationManager.createThrottledReplicationCommand(
- 1L, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+ replicationManager.sendThrottledReplicationCommand(
+ container, new ArrayList<>(sourceNodes.keySet()), destination, 0);
+ }
+
+ @Test
+ public void testCreateThrottledDeleteContainerCommand()
+ throws CommandTargetOverloadedException, NodeNotFoundException,
+ NotLeaderException {
+ Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
+ eq(SCMCommandProto.Type.deleteContainerCommand)))
+ .thenAnswer(invocation -> 0);
+
+ DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+ replicationManager.sendThrottledDeleteCommand(container, 1, target, true);
+ Assert.assertEquals(commandsSent.size(), 1);
+ }
+
+ @Test(expected = CommandTargetOverloadedException.class)
+ public void testCreateThrottledDeleteContainerCommandThrowsWhenNoSources()
+ throws CommandTargetOverloadedException, NodeNotFoundException,
+ NotLeaderException {
+ int limit = replicationManager.getConfig().getDatanodeDeleteLimit();
+
+ Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(),
+ eq(SCMCommandProto.Type.deleteContainerCommand)))
+ .thenAnswer(invocation -> limit + 1);
+
+ DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+ replicationManager.sendThrottledDeleteCommand(container, 1, target, true);
}
@SafeVarargs
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]