This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new da79b9a8b9 HDDS-8504. ReplicationManager: Pass used and excluded node
separately for Under and Mis-Replication (#4694)
da79b9a8b9 is described below
commit da79b9a8b9703ecef32479329aa4163b9f0d35ae
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu May 11 16:40:07 2023 +0100
HDDS-8504. ReplicationManager: Pass used and excluded node separately for
Under and Mis-Replication (#4694)
---
.../replication/ECUnderReplicationHandler.java | 60 +++++-----
.../replication/MisReplicationHandler.java | 20 ++--
.../replication/RatisUnderReplicationHandler.java | 29 ++---
.../replication/ReplicationManagerUtil.java | 98 +++++++++++++++
.../container/replication/ReplicationTestUtil.java | 3 +-
.../replication/TestECUnderReplicationHandler.java | 29 +++--
.../TestRatisUnderReplicationHandler.java | 80 ++++++++++++-
.../replication/TestReplicationManagerUtil.java | 133 +++++++++++++++++++++
8 files changed, 383 insertions(+), 69 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index e54102b2e6..e6d998a4a1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -124,19 +124,15 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
return 0;
}
- // don't place reconstructed replicas on exclude nodes, since they already
- // have replicas
- List<DatanodeDetails> excludedNodes = replicas.stream()
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList());
- // DNs that are already waiting to receive replicas cannot be targets
- excludedNodes.addAll(
- pendingOps.stream()
- .filter(containerReplicaOp -> containerReplicaOp.getOpType() ==
- ContainerReplicaOp.PendingOpType.ADD)
- .map(ContainerReplicaOp::getTarget)
- .collect(Collectors.toList()));
+ ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+ ReplicationManagerUtil.getExcludedAndUsedNodes(
+ new ArrayList<>(replicas), Collections.emptySet(), pendingOps,
+ replicationManager);
+ List<DatanodeDetails> excludedNodes
+ = excludedAndUsedNodes.getExcludedNodes();
excludedNodes.addAll(replicationManager.getExcludedNodes());
+ List<DatanodeDetails> usedNodes
+ = excludedAndUsedNodes.getUsedNodes();
final ContainerID id = container.containerID();
int commandsSent = 0;
@@ -162,14 +158,14 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
IOException firstException = null;
try {
commandsSent += processMissingIndexes(replicaCount, sources,
- availableSourceNodes, excludedNodes);
+ availableSourceNodes, excludedNodes, usedNodes);
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
firstException = e;
}
try {
commandsSent += processDecommissioningIndexes(replicaCount, sources,
- availableSourceNodes, excludedNodes);
+ availableSourceNodes, excludedNodes, usedNodes);
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
if (firstException == null) {
@@ -178,7 +174,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
}
try {
commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
- excludedNodes);
+ excludedNodes, usedNodes);
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
if (firstException == null) {
@@ -272,7 +268,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
ECContainerReplicaCount replicaCount, Map<Integer,
Pair<ContainerReplica, NodeStatus>> sources,
List<DatanodeDetails> availableSourceNodes,
- List<DatanodeDetails> excludedNodes) throws IOException {
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> usedNodes) throws IOException {
ContainerInfo container = replicaCount.getContainer();
ECReplicationConfig repConfig =
(ECReplicationConfig)container.getReplicationConfig();
@@ -286,7 +283,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
int expectedTargets = missingIndexes.size();
final List<DatanodeDetails> selectedDatanodes =
ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
- expectedTargets, null, excludedNodes, currentContainerSize,
+ expectedTargets, usedNodes, excludedNodes, currentContainerSize,
container);
// If we got less targets than missing indexes, we need to prune the
@@ -297,7 +294,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
missingIndexes.size()).clear();
}
if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
- excludedNodes.addAll(selectedDatanodes);
+ usedNodes.addAll(selectedDatanodes);
// TODO - what are we adding all the selected nodes to available
// sources?
availableSourceNodes.addAll(selectedDatanodes);
@@ -357,18 +354,19 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
ECContainerReplicaCount replicaCount,
Map<Integer, Pair<ContainerReplica, NodeStatus>> sources,
List<DatanodeDetails> availableSourceNodes,
- List<DatanodeDetails> excludedNodes) throws IOException {
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> usedNodes)
+ throws IOException {
ContainerInfo container = replicaCount.getContainer();
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
int commandsSent = 0;
if (decomIndexes.size() > 0) {
final List<DatanodeDetails> selectedDatanodes =
ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
- decomIndexes.size(), null, excludedNodes, currentContainerSize,
- container);
+ decomIndexes.size(), usedNodes, excludedNodes,
+ currentContainerSize, container);
if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
- excludedNodes.addAll(selectedDatanodes);
+ usedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
CommandTargetOverloadedException overloadedException = null;
@@ -382,11 +380,11 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
ContainerReplica sourceReplica = source.getLeft();
if (!iterator.hasNext()) {
LOG.warn("Couldn't find enough targets. Available source"
- + " nodes: {}, the target nodes: {}, excluded nodes: {}"
- + " and the decommission indexes: {}",
+ + " nodes: {}, the target nodes: {}, excluded nodes: {},"
+ + " usedNodes: {}, and the decommission indexes: {}",
sources.values().stream()
.map(Pair::getLeft).collect(Collectors.toSet()),
- selectedDatanodes, excludedNodes, decomIndexes);
+ selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
break;
}
try {
@@ -430,7 +428,8 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
private int processMaintenanceOnlyIndexes(
ECContainerReplicaCount replicaCount,
Map<Integer, Pair<ContainerReplica, NodeStatus>> sources,
- List<DatanodeDetails> excludedNodes) throws IOException {
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> usedNodes)
+ throws IOException {
Set<Integer> maintIndexes = replicaCount.maintenanceOnlyIndexes(true);
if (maintIndexes.isEmpty()) {
return 0;
@@ -444,9 +443,9 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
return 0;
}
List<DatanodeDetails> targets = ReplicationManagerUtil.getTargetDatanodes(
- containerPlacement, maintIndexes.size(), null, excludedNodes,
+ containerPlacement, maintIndexes.size(), usedNodes, excludedNodes,
currentContainerSize, container);
- excludedNodes.addAll(targets);
+ usedNodes.addAll(targets);
Iterator<DatanodeDetails> iterator = targets.iterator();
int commandsSent = 0;
@@ -466,11 +465,12 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
ContainerReplica sourceReplica = source.getLeft();
if (!iterator.hasNext()) {
LOG.warn("Couldn't find enough targets. Available source"
- + " nodes: {}, target nodes: {}, excluded nodes: {} and"
+ + " nodes: {}, target nodes: {}, excluded nodes: {},"
+ + " usedNodes: {} and"
+ " maintenance indexes: {}",
sources.values().stream()
.map(Pair::getLeft).collect(Collectors.toSet()),
- targets, excludedNodes, maintIndexes);
+ targets, excludedNodes, usedNodes, maintIndexes);
break;
}
try {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index b5590f2985..d41f80d604 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -137,22 +138,23 @@ public abstract class MisReplicationHandler implements
container.getContainerID());
return 0;
}
-
Set<ContainerReplica> sources = filterSources(replicas);
Set<ContainerReplica> replicasToBeReplicated = containerPlacement
.replicasToCopyToFixMisreplication(replicas.stream()
.collect(Collectors.toMap(Function.identity(),
sources::contains)));
- usedDns = replicas.stream().filter(r ->
!replicasToBeReplicated.contains(r))
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList());
- List<DatanodeDetails> excludedDns = replicasToBeReplicated.stream()
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList());
+
+ ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes
+ = ReplicationManagerUtil.getExcludedAndUsedNodes(
+ new ArrayList(replicas), replicasToBeReplicated,
+ Collections.emptyList(), replicationManager);
+
int requiredNodes = replicasToBeReplicated.size();
List<DatanodeDetails> targetDatanodes = ReplicationManagerUtil
- .getTargetDatanodes(containerPlacement, requiredNodes, usedDns,
- excludedDns, currentContainerSize, container);
+ .getTargetDatanodes(containerPlacement, requiredNodes,
+ excludedAndUsedNodes.getUsedNodes(),
+ excludedAndUsedNodes.getExcludedNodes(), currentContainerSize,
+ container);
List<DatanodeDetails> availableSources = sources.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 3ae30bea78..ae14c20f47 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalLong;
@@ -264,26 +265,20 @@ public class RatisUnderReplicationHandler
LOG.debug("Need {} target datanodes for container {}. Current " +
"replicas: {}.", replicaCount.additionalReplicaNeeded(),
replicaCount.getContainer().containerID(), replicaCount.getReplicas());
- // DNs that already have replicas cannot be targets and should be excluded
- final List<DatanodeDetails> excludeList =
- replicaCount.getReplicas().stream()
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList());
- // DNs that are already waiting to receive replicas cannot be targets
- final List<DatanodeDetails> pendingReplication =
- pendingOps.stream()
- .filter(containerReplicaOp -> containerReplicaOp.getOpType() ==
- ContainerReplicaOp.PendingOpType.ADD)
- .map(ContainerReplicaOp::getTarget)
- .collect(Collectors.toList());
- LOG.debug("Excluding DNs. excludeList: {}, size: {}. pendingReplication: "
+
- "{}, size: {}.", excludeList, excludeList.size(),
- pendingReplication, pendingReplication.size());
- excludeList.addAll(pendingReplication);
+ ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+ ReplicationManagerUtil.getExcludedAndUsedNodes(
+ replicaCount.getReplicas(), Collections.emptySet(), pendingOps,
+ replicationManager);
+
+ List<DatanodeDetails> excluded = excludedAndUsedNodes.getExcludedNodes();
+ List<DatanodeDetails> used = excludedAndUsedNodes.getUsedNodes();
+
+ LOG.debug("UsedList: {}, size {}. ExcludeList: {}, size: {}. ",
+ used, used.size(), excluded, excluded.size());
return ReplicationManagerUtil.getTargetDatanodes(placementPolicy,
- replicaCount.additionalReplicaNeeded(), null, excludeList,
+ replicaCount.additionalReplicaNeeded(), used, excluded,
currentContainerSize, replicaCount.getContainer());
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index 12b1eecf61..6757b059da 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -18,14 +18,20 @@
package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
/**
* Utility class for ReplicationManager.
@@ -90,4 +96,96 @@ public final class ReplicationManagerUtil {
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
+ /**
+ * Given a list of replicas and a set of nodes to be removed, returns an
+ * object container two lists. One is a list of nodes that should be excluded
+ * from being selected as targets for new replicas. The other is a list of
+ * nodes that are currently used by the container and the placement policy
+ * can consider for rack placement
+ * @param replicas List of existing replicas
+ * @param toBeRemoved Set of nodes containing replicas that are to be removed
+ * @param pendingReplicaOps List of pending replica operations
+ * @param replicationManager ReplicationManager instance to get NodeStatus
+ * @return ExcludedAndUsedNodes object containing the excluded and used lists
+ */
+ public static ExcludedAndUsedNodes getExcludedAndUsedNodes(
+ List<ContainerReplica> replicas,
+ Set<ContainerReplica> toBeRemoved,
+ List<ContainerReplicaOp> pendingReplicaOps,
+ ReplicationManager replicationManager) {
+ List<DatanodeDetails> excludedNodes = new ArrayList<>();
+ List<DatanodeDetails> usedNodes = new ArrayList<>();
+
+ for (ContainerReplica r : replicas) {
+ if (r.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+ // Hosts with an Unhealthy replica cannot receive a new replica, but
+ // they are not considered used as they will be removed later.
+ excludedNodes.add(r.getDatanodeDetails());
+ continue;
+ }
+ if (toBeRemoved.contains(r)) {
+ // This node is currently present, but we plan to remove it so it is
not
+ // considered used, but must be excluded
+ excludedNodes.add(r.getDatanodeDetails());
+ continue;
+ }
+ try {
+ NodeStatus nodeStatus =
+ replicationManager.getNodeStatus(r.getDatanodeDetails());
+ if (nodeStatus.isDecommission()) {
+ // Decommission nodes are going to go away and their replicas need to
+ // be replaced. Therefore we mark them excluded.
+ // Maintenance nodes should return to the cluster, so they would
still
+ // be considered used (handled in the catch all at the end of the
loop
+ // ).
+ excludedNodes.add(r.getDatanodeDetails());
+ continue;
+ }
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Node {} not found in node manager.", r.getDatanodeDetails());
+ // This should not happen, but if it does, just add the node to the
+ // exclude list
+ excludedNodes.add(r.getDatanodeDetails());
+ continue;
+ }
+ // If we get here, this is a used node
+ usedNodes.add(r.getDatanodeDetails());
+ }
+ for (ContainerReplicaOp pending : pendingReplicaOps) {
+ if (pending.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ // If we are adding a replicas, then its scheduled to become a used
node
+ usedNodes.add(pending.getTarget());
+ }
+ if (pending.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ // If there are any ops pending delete, we cannot use the node, but
they
+ // are not considered used as they will be removed later.
+ excludedNodes.add(pending.getTarget());
+ }
+ }
+ return new ExcludedAndUsedNodes(excludedNodes, usedNodes);
+ }
+
+
+ /**
+ * Simple class to hold the excluded and used nodes lists.
+ */
+ public static class ExcludedAndUsedNodes {
+ private List<DatanodeDetails> excludedNodes;
+ private List<DatanodeDetails> usedNodes;
+
+ public ExcludedAndUsedNodes(List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> usedNodes) {
+ this.excludedNodes = excludedNodes;
+ this.usedNodes = usedNodes;
+ }
+
+ public List<DatanodeDetails> getExcludedNodes() {
+ return excludedNodes;
+ }
+
+ public List<DatanodeDetails> getUsedNodes() {
+ return usedNodes;
+ }
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 9d520d69ac..5da2c8e54b 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -284,7 +284,8 @@ public final class ReplicationTestUtil {
if (nodesRequiredToChoose > 1) {
throw new IllegalArgumentException("Only one node is allowed");
}
- if (excludedNodes.contains(nodeToReturn)) {
+ if (excludedNodes.contains(nodeToReturn)
+ || usedNodes.contains(nodeToReturn)) {
throw new SCMException("Insufficient Nodes available to choose",
SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index af486d257d..bd91bfce85 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -61,6 +61,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singleton;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
@@ -165,10 +166,14 @@ public class TestECUnderReplicationHandler {
ArgumentCaptor<List<DatanodeDetails>> captor =
ArgumentCaptor.forClass(List.class);
- Mockito.when(ecPlacementPolicy.chooseDatanodes(captor.capture(),
+ // The used list is modified after it is passed to the placement policy,
+ // so a plain Captor won't work.
+ AtomicReference<List<DatanodeDetails>> usedList = new AtomicReference<>();
+ Mockito.when(ecPlacementPolicy.chooseDatanodes(any(), captor.capture(),
any(), anyInt(), anyLong(), anyLong())
).thenAnswer(invocationOnMock -> {
- int numNodes = invocationOnMock.getArgument(2);
+ usedList.set(new ArrayList<>(invocationOnMock.getArgument(0)));
+ int numNodes = invocationOnMock.getArgument(3);
List<DatanodeDetails> targets = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
targets.add(MockDatanodeDetails.randomDatanodeDetails());
@@ -180,6 +185,11 @@ public class TestECUnderReplicationHandler {
replicas, Collections.emptyList(), result, 2);
Assertions.assertTrue(captor.getValue().contains(excludedByRM));
+ Assertions.assertEquals(3, usedList.get().size());
+ for (ContainerReplica r : replicas) {
+ Assertions.assertTrue(
+ usedList.get().contains(r.getDatanodeDetails()));
+ }
}
@Test
@@ -793,10 +803,10 @@ public class TestECUnderReplicationHandler {
Pair.of(IN_MAINTENANCE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
- Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), Mockito.isNull(),
- anyInt(), anyLong(), anyLong()))
+ Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+ Mockito.isNull(), anyInt(), anyLong(), anyLong()))
.thenAnswer(invocationOnMock -> {
- int numNodes = invocationOnMock.getArgument(2);
+ int numNodes = invocationOnMock.getArgument(3);
List<DatanodeDetails> targets = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
targets.add(MockDatanodeDetails.randomDatanodeDetails());
@@ -839,12 +849,13 @@ public class TestECUnderReplicationHandler {
containing that DN. Ensures the test will fail if excludeNodes does not
contain the DN pending ADD.
*/
- Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), Mockito.isNull(),
- anyInt(), anyLong(), anyLong()))
+ Mockito.when(ecPlacementPolicy.chooseDatanodes(anyList(), anyList(),
+ Mockito.isNull(), anyInt(), anyLong(), anyLong()))
.thenAnswer(invocationOnMock -> {
- List<DatanodeDetails> excludeList = invocationOnMock.getArgument(0);
+ List<DatanodeDetails> usedList = invocationOnMock.getArgument(0);
+ List<DatanodeDetails> excludeList = invocationOnMock.getArgument(1);
List<DatanodeDetails> targets = new ArrayList<>(1);
- if (excludeList.contains(dn)) {
+ if (usedList.contains(dn) || excludeList.contains(dn)) {
targets.add(MockDatanodeDetails.randomDatanodeDetails());
} else {
targets.add(dn);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index ae0be5e63a..fe0caf8810 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -41,9 +41,12 @@ import
org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -52,10 +55,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
/**
* Tests for {@link RatisUnderReplicationHandler}.
@@ -92,7 +99,7 @@ public class TestRatisUnderReplicationHandler {
DatanodeDetails, and NodeState as HEALTHY.
*/
Mockito.when(
- replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+ replicationManager.getNodeStatus(any(DatanodeDetails.class)))
.thenAnswer(invocationOnMock -> {
DatanodeDetails dn = invocationOnMock.getArgument(0);
return new NodeStatus(dn.getPersistedOpState(),
@@ -301,11 +308,78 @@ public class TestRatisUnderReplicationHandler {
// Ensure that the replica with SEQ=2 is the only source sent
Mockito.verify(replicationManager).sendThrottledReplicationCommand(
- Mockito.any(ContainerInfo.class),
+ any(ContainerInfo.class),
Mockito.eq(Collections.singletonList(valid.getDatanodeDetails())),
- Mockito.any(DatanodeDetails.class), anyInt());
+ any(DatanodeDetails.class), anyInt());
}
+ @Test
+ public void testCorrectUsedAndExcludedNodesPassed() throws IOException {
+ PlacementPolicy mockPolicy = Mockito.mock(PlacementPolicy.class);
+ Mockito.when(mockPolicy.chooseDatanodes(any(), any(), any(),
+ anyInt(), anyLong(), anyLong()))
+ .thenReturn(Collections.singletonList(
+ MockDatanodeDetails.randomDatanodeDetails()));
+
+ ArgumentCaptor<List<DatanodeDetails>> usedNodesCaptor =
+ ArgumentCaptor.forClass(List.class);
+
+ ArgumentCaptor<List<DatanodeDetails>> excludedNodesCaptor =
+ ArgumentCaptor.forClass(List.class);
+
+ RatisUnderReplicationHandler handler =
+ new RatisUnderReplicationHandler(mockPolicy, conf, replicationManager);
+
+ Set<ContainerReplica> replicas = new HashSet<>();
+ ContainerReplica good = createContainerReplica(container.containerID(), 0,
+ IN_SERVICE, State.CLOSED, 1);
+ replicas.add(good);
+
+ ContainerReplica unhealthy = createContainerReplica(
+ container.containerID(), 0, IN_SERVICE, State.UNHEALTHY, 1);
+ replicas.add(unhealthy);
+
+ ContainerReplica decommissioning =
+ createContainerReplica(container.containerID(), 0,
+ DECOMMISSIONING, State.CLOSED, 1);
+ replicas.add(decommissioning);
+
+ ContainerReplica maintenance =
+ createContainerReplica(container.containerID(), 0,
+ IN_MAINTENANCE, State.CLOSED, 1);
+ replicas.add(maintenance);
+
+ List<ContainerReplicaOp> pendingOps = new ArrayList<>();
+ DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails pendingRemove =
MockDatanodeDetails.randomDatanodeDetails();
+ pendingOps.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+ pendingOps.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.DELETE, pendingRemove, 0));
+
+ handler.processAndSendCommands(replicas, pendingOps,
+ getUnderReplicatedHealthResult(), 2);
+
+
+ Mockito.verify(mockPolicy, times(1)).chooseDatanodes(
+ usedNodesCaptor.capture(), excludedNodesCaptor.capture(), any(),
+ anyInt(), anyLong(), anyLong());
+
+ List<DatanodeDetails> usedNodes = usedNodesCaptor.getValue();
+ List<DatanodeDetails> excludedNodes = excludedNodesCaptor.getValue();
+
+ Assertions.assertTrue(usedNodes.contains(good.getDatanodeDetails()));
+
Assertions.assertTrue(usedNodes.contains(maintenance.getDatanodeDetails()));
+ Assertions.assertTrue(usedNodes.contains(pendingAdd));
+
+ Assertions.assertTrue(excludedNodes.contains(
+ unhealthy.getDatanodeDetails()));
+ Assertions.assertTrue(excludedNodes.contains(
+ decommissioning.getDatanodeDetails()));
+ Assertions.assertTrue(excludedNodes.contains(pendingRemove));
+ }
+
+
/**
* Tests whether the specified expectNumCommands number of commands are
* created by the handler.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
new file mode 100644
index 0000000000..1ffbfec39a
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
+
+/**
+ * Tests for ReplicationManagerUtil.
+ */
+public class TestReplicationManagerUtil {
+
+ private ReplicationManager replicationManager;
+
+ @Before
+ public void setup() {
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ }
+
+ @Test
+ public void testGetExcludedAndUsedNodes() throws NodeNotFoundException {
+ ContainerID cid = ContainerID.valueOf(1L);
+ Set<ContainerReplica> replicas = new HashSet<>();
+ ContainerReplica good = createContainerReplica(cid, 0,
+ IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(good);
+
+ ContainerReplica remove = createContainerReplica(cid, 0,
+ IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(remove);
+
+ ContainerReplica unhealthy = createContainerReplica(
+ cid, 0, IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, 1);
+ replicas.add(unhealthy);
+
+ ContainerReplica decommissioning =
+ createContainerReplica(cid, 0,
+ DECOMMISSIONING, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(decommissioning);
+
+ ContainerReplica maintenance =
+ createContainerReplica(cid, 0,
+ IN_MAINTENANCE, ContainerReplicaProto.State.CLOSED, 1);
+ replicas.add(maintenance);
+
+ // Take one of the replicas and set it to be removed. It should be on the
+ // excluded list rather than the used list.
+ Set<ContainerReplica> toBeRemoved = new HashSet<>();
+ toBeRemoved.add(remove);
+
+ // Finally, add a pending add and delete. The add should go onto the used
+ // list and the delete added to the excluded nodes.
+ DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails pendingDelete =
MockDatanodeDetails.randomDatanodeDetails();
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+ pending.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.DELETE, pendingDelete, 0));
+
+ Mockito.when(replicationManager.getNodeStatus(Mockito.any())).thenAnswer(
+ invocation -> {
+ final DatanodeDetails dn = invocation.getArgument(0);
+ for (ContainerReplica r : replicas) {
+ if (r.getDatanodeDetails().equals(dn)) {
+ return new NodeStatus(
+ r.getDatanodeDetails().getPersistedOpState(),
+ HddsProtos.NodeState.HEALTHY);
+ }
+ }
+ throw new NodeNotFoundException(dn.getUuidString());
+ });
+
+ ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+ ReplicationManagerUtil.getExcludedAndUsedNodes(
+ new ArrayList<>(replicas), toBeRemoved, pending,
+ replicationManager);
+
+ Assertions.assertEquals(3, excludedAndUsedNodes.getUsedNodes().size());
+ Assertions.assertTrue(excludedAndUsedNodes.getUsedNodes()
+ .contains(good.getDatanodeDetails()));
+ Assertions.assertTrue(excludedAndUsedNodes.getUsedNodes()
+ .contains(maintenance.getDatanodeDetails()));
+ Assertions.assertTrue(excludedAndUsedNodes.getUsedNodes()
+ .contains(pendingAdd));
+
+ Assertions.assertEquals(4, excludedAndUsedNodes.getExcludedNodes().size());
+ Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(unhealthy.getDatanodeDetails()));
+ Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(decommissioning.getDatanodeDetails()));
+ Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(remove.getDatanodeDetails()));
+ Assertions.assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(pendingDelete));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]