This is an automated email from the ASF dual-hosted git repository.
siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new faa19906f6 HDDS-9592. Replication Manager: Save UNHEALTHY replicas
with highest BCSID for a QUASI_CLOSED container (#5794)
faa19906f6 is described below
commit faa19906f664f3a68ccd3b1b9d6347dced279605
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Wed Dec 20 11:54:06 2023 +0530
HDDS-9592. Replication Manager: Save UNHEALTHY replicas with highest BCSID
for a QUASI_CLOSED container (#5794)
---
.../replication/ContainerHealthResult.java | 9 +
.../replication/ECUnderReplicationHandler.java | 2 +-
.../LegacyRatisContainerReplicaCount.java | 9 +-
.../replication/LegacyReplicationManager.java | 10 +-
.../replication/MisReplicationHandler.java | 2 +-
.../replication/RatisContainerReplicaCount.java | 64 ++++--
.../replication/RatisUnderReplicationHandler.java | 109 ++++++++++-
.../container/replication/ReplicationManager.java | 4 +-
.../replication/ReplicationManagerUtil.java | 78 +++++---
.../health/VulnerableUnhealthyReplicasHandler.java | 102 ++++++++++
.../hdds/scm/node/DatanodeAdminMonitorImpl.java | 16 +-
.../TestRatisUnderReplicationHandler.java | 70 +++++++
.../replication/TestReplicationManager.java | 59 ++++++
.../replication/TestReplicationManagerUtil.java | 94 ++++++++-
.../TestVulnerableUnhealthyReplicasHandler.java | 217 +++++++++++++++++++++
.../scm/node/DatanodeAdminMonitorTestUtil.java | 2 +-
.../hdds/scm/node/TestDatanodeAdminMonitor.java | 75 +++++++
17 files changed, 864 insertions(+), 58 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index a2262cdafd..0abe8f6ea3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -113,6 +113,7 @@ public class ContainerHealthResult {
private boolean hasUnReplicatedOfflineIndexes = false;
private boolean offlineIndexesOkAfterPending = false;
private int requeueCount = 0;
+ private boolean hasVulnerableUnhealthy = false;
public UnderReplicatedHealthResult(ContainerInfo containerInfo,
int remainingRedundancy, boolean dueToOutOfService,
@@ -269,6 +270,14 @@ public class ContainerHealthResult {
return isMissing;
}
+ public void setHasVulnerableUnhealthy(boolean hasVulnerableUnhealthy) {
+ this.hasVulnerableUnhealthy = hasVulnerableUnhealthy;
+ }
+
+ public boolean hasVulnerableUnhealthy() {
+ return hasVulnerableUnhealthy;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder("UnderReplicatedHealthResult{")
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index daae24f7f2..07d38c05da 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -128,7 +128,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
container.containerID(), replicas);
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
- ReplicationManagerUtil.getExcludedAndUsedNodes(
+ ReplicationManagerUtil.getExcludedAndUsedNodes(container,
new ArrayList<>(replicas), Collections.emptySet(), pendingOps,
replicationManager);
List<DatanodeDetails> excludedNodes
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
index f708ae1ead..f491e2bd6f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
@@ -22,6 +22,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import java.util.List;
import java.util.Set;
@@ -130,6 +131,12 @@ public class LegacyRatisContainerReplicaCount extends
public boolean isSufficientlyReplicatedForOffline(DatanodeDetails datanode,
NodeManager nodeManager) {
return super.isSufficientlyReplicated() &&
- super.getVulnerableUnhealthyReplicas(nodeManager).isEmpty();
+ super.getVulnerableUnhealthyReplicas(dn -> {
+ try {
+ return nodeManager.getNodeStatus(dn);
+ } catch (NodeNotFoundException e) {
+ return null;
+ }
+ }).isEmpty();
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 07a8f730ec..04862e0d31 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -558,7 +558,15 @@ public class LegacyReplicationManager {
* match the container's Sequence ID.
*/
List<ContainerReplica> vulnerableUnhealthy =
- replicaSet.getVulnerableUnhealthyReplicas(nodeManager);
+ replicaSet.getVulnerableUnhealthyReplicas(dn -> {
+ try {
+ return nodeManager.getNodeStatus(dn);
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Exception for datanode {} while getting vulnerable
replicas for container {}, with all " +
+ "replicas {}.", dn, container, replicas, e);
+ return null;
+ }
+ });
if (!vulnerableUnhealthy.isEmpty()) {
report.incrementAndSample(HealthState.UNDER_REPLICATED,
container.containerID());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 70b2a44427..636b0e9589 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -148,7 +148,7 @@ public abstract class MisReplicationHandler implements
.collect(Collectors.toMap(Function.identity(),
sources::contains)));
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes
- = ReplicationManagerUtil.getExcludedAndUsedNodes(
+ = ReplicationManagerUtil.getExcludedAndUsedNodes(container,
new ArrayList(replicas), replicasToBeReplicated,
Collections.emptyList(), replicationManager);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
index bec3b1090e..d23934184e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
import java.util.ArrayList;
import java.util.Collections;
@@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
@@ -423,9 +424,48 @@ public class RatisContainerReplicaCount implements
ContainerReplicaCount {
return isSufficientlyReplicated();
}
+ /**
+ * Checks if all replicas (except UNHEALTHY) on in-service nodes are in the
+ * same health state as the container. This is similar to what
+ * {@link ContainerReplicaCount#isHealthy()} does. The difference is in how
+ * both methods treat UNHEALTHY replicas.
+ * <p>
+ * This method is the interface between the decommissioning flow and
+ * Replication Manager. Callers can use it to check whether replicas of a
+ * container are in the same state as the container before a datanode is
+ * taken offline.
+ * </p>
+ * <p>
+ * Note that this method's purpose is to only compare the replica state with
+ * the container state. It does not check if the container has sufficient
+ * number of replicas - that is the job of {@link ContainerReplicaCount
+ * #isSufficientlyReplicatedForOffline(DatanodeDetails, NodeManager)}.
+ * @return true if the container is healthy enough, which is determined by
+ * various checks
+ * </p>
+ */
@Override
public boolean isHealthyEnoughForOffline() {
- return isHealthy();
+ long countInService = getReplicas().stream()
+ .filter(r -> r.getDatanodeDetails().getPersistedOpState() ==
IN_SERVICE)
+ .count();
+ if (countInService == 0) {
+ /*
+ Having no in-service nodes is unexpected and SCM shouldn't allow this
+ to happen in the first place. Return false here just to be safe.
+ */
+ return false;
+ }
+
+ HddsProtos.LifeCycleState containerState = getContainer().getState();
+ return (containerState == HddsProtos.LifeCycleState.CLOSED
+ || containerState == HddsProtos.LifeCycleState.QUASI_CLOSED)
+ && getReplicas().stream()
+ .filter(r -> r.getDatanodeDetails().getPersistedOpState() ==
IN_SERVICE)
+ .filter(r -> r.getState() !=
+ ContainerReplicaProto.State.UNHEALTHY)
+ .allMatch(r -> ReplicationManager.compareState(
+ containerState, r.getState()));
}
/**
@@ -435,14 +475,14 @@ public class RatisContainerReplicaCount implements
ContainerReplicaCount {
* to save at least one copy of each such UNHEALTHY replica. This method
* finds such UNHEALTHY replicas.
*
- * @param nodeManager an instance of NodeManager
+ * @param nodeStatusFn a function used to check the {@link NodeStatus} of a
node,
+ * accepting a {@link DatanodeDetails} and returning {@link NodeStatus}
* @return List of UNHEALTHY replicas with the greatest Sequence ID that
* need to be replicated to other nodes. Empty list if this container is not
* QUASI_CLOSED, doesn't have a mix of healthy and UNHEALTHY replicas, or
* if there are no replicas that need to be saved.
*/
- List<ContainerReplica> getVulnerableUnhealthyReplicas(
- NodeManager nodeManager) {
+ public List<ContainerReplica>
getVulnerableUnhealthyReplicas(Function<DatanodeDetails, NodeStatus>
nodeStatusFn) {
if (container.getState() != HddsProtos.LifeCycleState.QUASI_CLOSED) {
// this method is only relevant for QUASI_CLOSED containers
return Collections.emptyList();
@@ -456,7 +496,7 @@ public class RatisContainerReplicaCount implements
ContainerReplicaCount {
}
if (replica.getSequenceId() == container.getSequenceId()) {
- if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+ if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY &&
!replica.isEmpty()) {
unhealthyReplicas.add(replica);
} else if (replica.getState() ==
ContainerReplicaProto.State.QUASI_CLOSED) {
@@ -474,20 +514,16 @@ public class RatisContainerReplicaCount implements
ContainerReplicaCount {
unhealthyReplicas.removeIf(
replica -> {
- try {
- return !nodeManager.getNodeStatus(replica.getDatanodeDetails())
- .isHealthy();
- } catch (NodeNotFoundException e) {
- return true;
- }
+ NodeStatus status = nodeStatusFn.apply(replica.getDatanodeDetails());
+ return status == null || !status.isHealthy();
});
/*
- At this point, the list of unhealthyReplicas contains all UNHEALTHY
+ At this point, the list of unhealthyReplicas contains all UNHEALTHY
non-empty
replicas with the greatest Sequence ID that are on healthy Datanodes.
Note that this also includes multiple copies of the same UNHEALTHY
replica, that is, replicas with the same Origin ID. We need to consider
the fact that replicas can be uniquely unhealthy. That is, 2 UNHEALTHY
- replicas will difference Origin ID need not be exact copies of each other.
+ replicas with different Origin ID need not be exact copies of each other.
Replicas that don't have at least one instance (multiple instances of a
replica will have the same Origin ID) on an IN_SERVICE node are
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 98c19d16ff..4a823fb8ee 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -98,6 +99,14 @@ public class RatisUnderReplicationHandler
new RatisContainerReplicaCount(containerInfo, replicas, pendingOps,
minHealthyForMaintenance, false);
+ if (result instanceof ContainerHealthResult.UnderReplicatedHealthResult) {
+ ContainerHealthResult.UnderReplicatedHealthResult
+ underReplicatedResult =
(ContainerHealthResult.UnderReplicatedHealthResult) result;
+ if (underReplicatedResult.hasVulnerableUnhealthy()) {
+ return handleVulnerableUnhealthyReplicas(withUnhealthy, pendingOps);
+ }
+ }
+
// verify that this container is still under replicated and we don't have
// sufficient replication after considering pending adds
RatisContainerReplicaCount replicaCount =
@@ -151,6 +160,104 @@ public class RatisUnderReplicationHandler
return commandsSent;
}
+ /**
+ * Sends a replicate command for each replica specified in
+ * vulnerableUnhealthy.
+ * @param replicaCount RatisContainerReplicaCount for this container
+ * @param pendingOps List of pending ops
+ * @return number of replicate commands sent
+ */
+ private int handleVulnerableUnhealthyReplicas(RatisContainerReplicaCount
replicaCount,
+ List<ContainerReplicaOp> pendingOps) throws NotLeaderException,
CommandTargetOverloadedException, SCMException {
+ ContainerInfo container = replicaCount.getContainer();
+ List<ContainerReplica> vulnerableUnhealthy =
replicaCount.getVulnerableUnhealthyReplicas(dn -> {
+ try {
+ return replicationManager.getNodeStatus(dn);
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Exception for datanode {} while handling vulnerable replicas
for container {}, with all replicas" +
+ " {}.", dn, container, replicaCount.getReplicas(), e);
+ return null;
+ }
+ });
+ LOG.info("Handling vulnerable UNHEALTHY replicas {} for container {}.",
vulnerableUnhealthy, container);
+
+ int pendingAdds = 0;
+ for (ContainerReplicaOp op : pendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ pendingAdds++;
+ }
+ }
+ if (pendingAdds >= vulnerableUnhealthy.size()) {
+ LOG.debug("There are {} pending adds for container {}, while the number
of UNHEALTHY replicas is {}.",
+ pendingAdds, container.containerID(), vulnerableUnhealthy.size());
+ return 0;
+ }
+
+ /*
+ Since we're replicating UNHEALTHY replicas, it's possible that replication
keeps on failing. Shuffling gives
+ other replicas a chance to be replicated since there's a limit on
in-flight adds.
+ */
+ Collections.shuffle(vulnerableUnhealthy);
+ return replicateEachSource(replicaCount, vulnerableUnhealthy, pendingOps);
+ }
+
+ /**
+ * Replicates each of the ContainerReplica specified in sources to new
+ * Datanodes. Will not consider Datanodes hosting existing replicas and
+ * Datanodes pending adds as targets. Note that this method simply skips
+ * a replica if its datanode is overloaded with commands, throwing an
+ * exception once all sources have been looked at.
+ * @param replicaCount RatisContainerReplicaCount for this container
+ * @param sources List containing replicas, each will be replicated
+ */
+ private int replicateEachSource(RatisContainerReplicaCount replicaCount,
List<ContainerReplica> sources,
+ List<ContainerReplicaOp> pendingOps) throws NotLeaderException,
SCMException, CommandTargetOverloadedException {
+ List<ContainerReplica> allReplicas = replicaCount.getReplicas();
+ ContainerInfo container = replicaCount.getContainer();
+
+ /*
+ We use the placement policy to get a target Datanode to which a vulnerable
replica will be replicated. In
+ placement policy terms, a 'used node' is a Datanode which has a legit
replica of this container. An 'excluded
+ node' is a Datanode that should not be considered to host a replica of
this container, but other Datanodes in this
+ Datanode's rack are available. So, Datanodes of any vulnerable replicas
should be excluded nodes while Datanodes
+ of other replicas, including UNHEALTHY replicas that are not pending
delete (because they have unique origin),
+ should be used nodes.
+ */
+ ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+ ReplicationManagerUtil.getExcludedAndUsedNodes(container, allReplicas,
Collections.emptySet(), pendingOps,
+ replicationManager);
+
+ CommandTargetOverloadedException firstException = null;
+ int numCommandsSent = 0;
+ for (ContainerReplica replica : sources) {
+ // find a target for each source and send replicate command
+ final List<DatanodeDetails> target =
+ ReplicationManagerUtil.getTargetDatanodes(placementPolicy, 1,
excludedAndUsedNodes.getUsedNodes(),
+ excludedAndUsedNodes.getExcludedNodes(), currentContainerSize,
container);
+ int count = 0;
+ try {
+ count = sendReplicationCommands(container,
ImmutableList.of(replica.getDatanodeDetails()), target);
+ } catch (CommandTargetOverloadedException e) {
+ LOG.info("Exception while replicating {} to target {} for container
{}.", replica, target, container, e);
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+
+ if (count == 1) {
+ // a command was sent to target, so it needs to be in the used nodes
list because it's pending an add
+ excludedAndUsedNodes.getUsedNodes().add(target.get(0));
+ }
+ numCommandsSent += count;
+ }
+
+ if (firstException != null) {
+ throw firstException;
+ }
+
+ return numCommandsSent;
+ }
+
private void removeUnhealthyReplicaIfPossible(ContainerInfo containerInfo,
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps)
throws NotLeaderException {
@@ -337,7 +444,7 @@ public class RatisUnderReplicationHandler
replicaCount.getContainer().containerID(), replicaCount.getReplicas());
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
- ReplicationManagerUtil.getExcludedAndUsedNodes(
+
ReplicationManagerUtil.getExcludedAndUsedNodes(replicaCount.getContainer(),
replicaCount.getReplicas(), Collections.emptySet(), pendingOps,
replicationManager);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 3b9f66595f..979cff799f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -54,6 +54,7 @@ import
org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHand
import
org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedContainerHandler;
import
org.apache.hadoop.hdds.scm.container.replication.health.RatisReplicationCheckHandler;
import
org.apache.hadoop.hdds.scm.container.replication.health.RatisUnhealthyReplicationCheckHandler;
+import
org.apache.hadoop.hdds.scm.container.replication.health.VulnerableUnhealthyReplicasHandler;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -279,7 +280,8 @@ public class ReplicationManager implements SCMService {
.addNext(ratisReplicationCheckHandler)
.addNext(new ClosedWithUnhealthyReplicasHandler(this))
.addNext(ecMisReplicationCheckHandler)
- .addNext(new RatisUnhealthyReplicationCheckHandler());
+ .addNext(new RatisUnhealthyReplicationCheckHandler())
+ .addNext(new VulnerableUnhealthyReplicasHandler(this));
start();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index 076a81e69b..3dcd6aa23b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -116,6 +117,7 @@ public final class ReplicationManagerUtil {
* @return ExcludedAndUsedNodes object containing the excluded and used lists
*/
public static ExcludedAndUsedNodes getExcludedAndUsedNodes(
+ ContainerInfo container,
List<ContainerReplica> replicas,
Set<ContainerReplica> toBeRemoved,
List<ContainerReplicaOp> pendingReplicaOps,
@@ -123,12 +125,37 @@ public final class ReplicationManagerUtil {
List<DatanodeDetails> excludedNodes = new ArrayList<>();
List<DatanodeDetails> usedNodes = new ArrayList<>();
+ List<ContainerReplica> nonUniqueUnhealthy = null;
+ if (container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+ /*
+ An UNHEALTHY replica with unique origin node id of a QUASI_CLOSED
container should be a used node (not excluded
+ node) because we preserve it. The following code will find non-unique
UNHEALTHY replicas. Later in the method
+ this list will be used to determine whether an UNHEALTHY replica's DN
should be a used node or excluded node.
+ */
+ nonUniqueUnhealthy =
+ selectUnhealthyReplicasForDelete(container, new HashSet<>(replicas),
0, dn -> {
+ try {
+ return replicationManager.getNodeStatus(dn);
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Exception for {} while selecting used and excluded
nodes for container {}.", dn, container);
+ return null;
+ }
+ });
+ }
for (ContainerReplica r : replicas) {
if (r.getState() == ContainerReplicaProto.State.UNHEALTHY) {
- // Hosts with an Unhealthy replica cannot receive a new replica, but
- // they are not considered used as they will be removed later.
- excludedNodes.add(r.getDatanodeDetails());
- continue;
+ if (container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+ // any unique UNHEALTHY will get added as used nodes in the
catch-all at the end of the loop
+ if (nonUniqueUnhealthy != null && nonUniqueUnhealthy.contains(r)) {
+ excludedNodes.add(r.getDatanodeDetails());
+ continue;
+ }
+ } else {
+ // Hosts with an UNHEALTHY replica (of a non QUASI_CLOSED container)
cannot receive a new replica, but
+ // they are not considered used as they will be removed later.
+ excludedNodes.add(r.getDatanodeDetails());
+ continue;
+ }
}
if (toBeRemoved.contains(r)) {
// This node is currently present, but we plan to remove it so it is
not
@@ -195,22 +222,8 @@ public final class ReplicationManagerUtil {
}
}
- /**
- * This is intended to be call when a container is under replicated, but
there
- * are no spare nodes to create new replicas on, due to having too many
- * unhealthy replicas or quasi-closed replicas which cannot be closed due to
- * having a lagging sequence ID. The logic here will select a replica to
- * delete, or return null if there are none which can be safely deleted.
- *
- * @param containerInfo The container to select a replica to delete from
- * @param replicas The list of replicas for the container
- * @param pendingDeletes number pending deletes for this container
- * @return A replica to delete, or null if there are none which can be safely
- * deleted.
- */
- public static ContainerReplica selectUnhealthyReplicaForDelete(
- ContainerInfo containerInfo, Set<ContainerReplica> replicas,
- int pendingDeletes, Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
+ public static List<ContainerReplica>
selectUnhealthyReplicasForDelete(ContainerInfo containerInfo,
+ Set<ContainerReplica> replicas, int pendingDeletes,
Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
if (pendingDeletes > 0) {
LOG.debug("Container {} has {} pending deletes which will free nodes.",
containerInfo, pendingDeletes);
@@ -261,18 +274,39 @@ public final class ReplicationManagerUtil {
deleteCandidates.sort(
Comparator.comparingLong(ContainerReplica::getSequenceId));
if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
- return deleteCandidates.size() > 0 ? deleteCandidates.get(0) : null;
+ return deleteCandidates.size() > 0 ? deleteCandidates : null;
}
if (containerInfo.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
List<ContainerReplica> nonUniqueOrigins =
findNonUniqueDeleteCandidates(replicas, deleteCandidates,
nodeStatusFn);
- return nonUniqueOrigins.size() > 0 ? nonUniqueOrigins.get(0) : null;
+ return nonUniqueOrigins.size() > 0 ? nonUniqueOrigins : null;
}
return null;
}
+ /**
+ * This is intended to be called when a container is under replicated, but
there
+ * are no spare nodes to create new replicas on, due to having too many
+ * unhealthy replicas or quasi-closed replicas which cannot be closed due to
+ * having a lagging sequence ID. The logic here will select a replica to
+ * delete, or return null if there are none which can be safely deleted.
+ *
+ * @param containerInfo The container to select a replica to delete from
+ * @param replicas The list of replicas for the container
+ * @param pendingDeletes number pending deletes for this container
+ * @return A replica to delete, or null if there are none which can be safely
+ * deleted.
+ */
+ public static ContainerReplica selectUnhealthyReplicaForDelete(
+ ContainerInfo containerInfo, Set<ContainerReplica> replicas,
+ int pendingDeletes, Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
+ List<ContainerReplica> containerReplicas =
+ selectUnhealthyReplicasForDelete(containerInfo, replicas,
pendingDeletes, nodeStatusFn);
+ return containerReplicas != null ? containerReplicas.get(0) : null;
+ }
+
/**
* Given a list of all replicas (including deleteCandidates), finds and
* returns replicas which don't have unique origin node IDs. This method
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java
new file mode 100644
index 0000000000..21b2d8151d
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import
org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+
+/**
+ * A QUASI_CLOSED container may have some UNHEALTHY replicas with the
+ * same Sequence ID as the container. RM should try to maintain one
+ * copy of such replicas when there are no healthy replicas that
+ * match the container's Sequence ID.
+ */
+public class VulnerableUnhealthyReplicasHandler extends AbstractCheck {
+ public static final Logger LOG =
LoggerFactory.getLogger(VulnerableUnhealthyReplicasHandler.class);
+ private final ReplicationManager replicationManager;
+
+ public VulnerableUnhealthyReplicasHandler(ReplicationManager
replicationManager) {
+ this.replicationManager = replicationManager;
+ }
+
+ /**
+ * Checks if the container is QUASI_CLOSED has some vulnerable UNHEALTHY
replicas that need to replicated to
+ * other Datanodes. These replicas have the same sequence ID as the
container while other healthy replicas don't.
+ * If the node hosting such a replica is being taken offline, then the
replica may have to be replicated to another
+ * node.
+ * @param request ContainerCheckRequest object representing the container
+ * @return true if some vulnerable UNHEALTHY replicas were found, else false
+ */
+ @Override
+ public boolean handle(ContainerCheckRequest request) {
+ ContainerInfo container = request.getContainerInfo();
+ if (container.getReplicationType() != RATIS) {
+ // This handler is only for Ratis containers.
+ return false;
+ }
+ if (container.getState() != HddsProtos.LifeCycleState.QUASI_CLOSED) {
+ return false;
+ }
+ Set<ContainerReplica> replicas = request.getContainerReplicas();
+ LOG.debug("Checking whether container {} with replicas {} has vulnerable
UNHEALTHY replicas.", container, replicas);
+ RatisContainerReplicaCount replicaCount =
+ new RatisContainerReplicaCount(container, replicas,
request.getPendingOps(), request.getMaintenanceRedundancy(),
+ true);
+
+ List<ContainerReplica> vulnerableUnhealthy =
replicaCount.getVulnerableUnhealthyReplicas(dn -> {
+ try {
+ return replicationManager.getNodeStatus(dn);
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Exception for datanode {} while handling vulnerable replicas
for container {}, with all replicas" +
+ " {}.", dn, container, replicaCount.getReplicas(), e);
+ return null;
+ }
+ });
+
+ if (!vulnerableUnhealthy.isEmpty()) {
+ LOG.info("Found vulnerable UNHEALTHY replicas {} for container {}.",
vulnerableUnhealthy, container);
+ ReplicationManagerReport report = request.getReport();
+
report.incrementAndSample(ReplicationManagerReport.HealthState.UNDER_REPLICATED,
container.containerID());
+ if (!request.isReadOnly()) {
+ ContainerHealthResult.UnderReplicatedHealthResult underRepResult =
+ replicaCount.toUnderHealthResult();
+ underRepResult.setHasVulnerableUnhealthy(true);
+ request.getReplicationQueue().enqueue(underRepResult);
+ }
+ return true;
+ }
+
+ return false;
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index 455307c6be..a7423a79dc 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -362,19 +362,7 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
continue;
}
- boolean isHealthy;
- /*
- If LegacyReplicationManager is enabled, then use the
- isHealthyEnoughForOffline API. ReplicationManager doesn't support this
- API yet.
- */
- boolean legacyEnabled = conf.getBoolean("hdds.scm.replication.enable" +
- ".legacy", false);
- if (legacyEnabled) {
- isHealthy = replicaSet.isHealthyEnoughForOffline();
- } else {
- isHealthy = replicaSet.isHealthy();
- }
+ boolean isHealthy = replicaSet.isHealthyEnoughForOffline();
if (!isHealthy) {
if (LOG.isDebugEnabled()) {
unClosedIDs.add(cid);
@@ -391,6 +379,8 @@ public class DatanodeAdminMonitorImpl implements
DatanodeAdminMonitor {
// state, except for any which are unhealthy. As the container is
closed, we can check
// if it is sufficiently replicated using replicationManager, but this
only works if the
// legacy RM is not enabled.
+ boolean legacyEnabled = conf.getBoolean("hdds.scm.replication.enable" +
+ ".legacy", false);
boolean replicatedOK;
if (legacyEnabled) {
replicatedOK = replicaSet.isSufficientlyReplicatedForOffline(dn,
nodeManager);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index 17548bc5fe..dd7747e127 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -551,6 +551,76 @@ public class TestRatisUnderReplicationHandler {
command.getKey()));
}
+ /**
+ * A QUASI_CLOSED container may end up having UNHEALTHY replicas with the
correct sequence ID, while none of the
+ * healthy replicas have the correct sequence ID. If any of these UNHEALTHY
replicas is unique and is being taken
+ * offline, then it needs to be replicated to another DN for decommission to
progress. This test asserts that a
+ * replicate command is sent for one such replica.
+ */
+ @Test
+ public void testUnderReplicationWithVulnerableReplicas() throws IOException {
+ final long sequenceID = 20;
+ container =
ReplicationTestUtil.createContainerInfo(RATIS_REPLICATION_CONFIG, 1,
+ HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID);
+
+ final Set<ContainerReplica> replicas = new HashSet<>(4);
+ for (int i = 0; i < 3; i++) {
+ replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED,
+ sequenceID - 1));
+ }
+ final ContainerReplica unhealthyReplica =
createContainerReplica(container.containerID(), 0,
+ DECOMMISSIONING, State.UNHEALTHY, sequenceID);
+ replicas.add(unhealthyReplica);
+ UnderReplicatedHealthResult result = getUnderReplicatedHealthResult();
+ Mockito.when(result.hasVulnerableUnhealthy()).thenReturn(true);
+
+ final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands =
testProcessing(replicas, Collections.emptyList(),
+ result, 2, 1);
+ assertEquals(unhealthyReplica.getDatanodeDetails(),
commands.iterator().next().getKey());
+ }
+
+ /**
+ * In the push replication model, a replicate command is sent to the DN
hosting the replica, and that DN is
+ * expected to "push" the replica to another DN. If the DN hosting the
replica has too many commands already, an
+ * exception is thrown. This test asserts that other vulnerable UNHEALTHY
replicas are still handled when an
+ * exception is caught for one of the replicas. Also asserts that the first
thrown exception isn't lost and is
+ * actually rethrown once other replicas are processed, so that the
container can be re-queued.
+ */
+ @Test
+ public void
testUnderReplicationWithVulnerableReplicasAndTargetOverloadedException()
+ throws NotLeaderException, CommandTargetOverloadedException {
+ final long sequenceID = 20;
+ container =
ReplicationTestUtil.createContainerInfo(RATIS_REPLICATION_CONFIG, 1,
+ HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID);
+
+ final Set<ContainerReplica> replicas = new HashSet<>(5);
+ for (int i = 0; i < 3; i++) {
+ replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED,
+ sequenceID - 1));
+ }
+
+ /*
+ Create 2 unhealthy vulnerable replicas. An exception is thrown for one of
the replicas, but the other replica
+ should still be processed and 1 command should be sent.
+ */
+ final ContainerReplica unhealthyReplica =
createContainerReplica(container.containerID(), 0,
+ DECOMMISSIONING, State.UNHEALTHY, sequenceID);
+ final ContainerReplica unhealthyReplica2 =
createContainerReplica(container.containerID(), 0,
+ ENTERING_MAINTENANCE, State.UNHEALTHY, sequenceID);
+ replicas.add(unhealthyReplica);
+ replicas.add(unhealthyReplica2);
+ UnderReplicatedHealthResult result = getUnderReplicatedHealthResult();
+ Mockito.when(result.hasVulnerableUnhealthy()).thenReturn(true);
+ ReplicationTestUtil.mockRMSendThrottleReplicateCommand(replicationManager,
commandsSent, new AtomicBoolean(true));
+
+ RatisUnderReplicationHandler handler = new
RatisUnderReplicationHandler(policy, conf, replicationManager);
+ assertThrows(CommandTargetOverloadedException.class, () ->
handler.processAndSendCommands(replicas,
+ Collections.emptyList(), result, 2));
+ assertEquals(1, commandsSent.size());
+ DatanodeDetails dn = commandsSent.iterator().next().getKey();
+ assertTrue(unhealthyReplica.getDatanodeDetails().equals(dn) ||
unhealthyReplica2.getDatanodeDetails().equals(dn));
+ }
+
@Test
public void testOnlyQuasiClosedReplicaWithWrongSequenceIdIsAvailable()
throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index a909377879..32463a5a6e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -91,6 +92,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
/**
@@ -444,6 +446,63 @@ public class TestReplicationManager {
assertEquals(0, repQueue.overReplicatedQueueSize());
}
+ @Test
+ public void testQuasiClosedContainerWithVulnerableUnhealthyReplica()
+ throws IOException, NodeNotFoundException {
+ RatisReplicationConfig ratisRepConfig =
+ RatisReplicationConfig.getInstance(THREE);
+ long sequenceID = 10;
+ ContainerInfo container = createContainerInfo(ratisRepConfig, 1,
+ HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID);
+
+ // this method creates replicas with same origin id and zero sequence id
+ Set<ContainerReplica> replicas =
+ createReplicasWithSameOrigin(container.containerID(),
+ ContainerReplicaProto.State.QUASI_CLOSED, 0, 0, 0);
+ replicas.add(createContainerReplica(container.containerID(), 0,
+ IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, sequenceID));
+ ContainerReplica decommissioning =
+ createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
+ ContainerReplicaProto.State.UNHEALTHY, sequenceID);
+ replicas.add(decommissioning);
+ storeContainerAndReplicas(container, replicas);
+ Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
+ .thenAnswer(invocation -> {
+ DatanodeDetails dn = invocation.getArgument(0);
+ if (dn.equals(decommissioning.getDatanodeDetails())) {
+ return new NodeStatus(DECOMMISSIONING,
HddsProtos.NodeState.HEALTHY);
+ }
+
+ return NodeStatus.inServiceHealthy();
+ });
+
+ replicationManager.processContainer(container, repQueue, repReport);
+ assertEquals(1, repReport.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ assertEquals(1, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+
+ Mockito.when(ratisPlacementPolicy.chooseDatanodes(anyList(), anyList(),
eq(null), eq(1), anyLong(),
+ anyLong())).thenAnswer(invocation ->
ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()));
+
Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(DatanodeDetails.class),
any(), any()))
+ .thenAnswer(invocation -> {
+ Map<SCMCommandProto.Type, Integer> map = new HashMap<>();
+ map.put(SCMCommandProto.Type.replicateContainerCommand, 0);
+ map.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+ return map;
+ });
+ RatisUnderReplicationHandler handler =
+ new RatisUnderReplicationHandler(ratisPlacementPolicy, configuration,
replicationManager);
+
+ handler.processAndSendCommands(replicas, Collections.emptyList(),
repQueue.dequeueUnderReplicatedContainer(), 2);
+ assertEquals(1, commandsSent.size());
+ Pair<UUID, SCMCommand<?>> command = commandsSent.iterator().next();
+ assertEquals(SCMCommandProto.Type.replicateContainerCommand,
command.getValue().getType());
+ assertEquals(decommissioning.getDatanodeDetails().getUuid(),
command.getKey());
+ }
+
/**
* When there is Quasi Closed Replica with incorrect sequence id
* for a Closed container, it's treated as unhealthy and deleted.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
index 3b81db7767..c68130e79e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
@@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -37,6 +39,7 @@ import java.util.Set;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -55,7 +58,9 @@ public class TestReplicationManagerUtil {
@Test
public void testGetExcludedAndUsedNodes() throws NodeNotFoundException {
- ContainerID cid = ContainerID.valueOf(1L);
+ ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+ ContainerID cid = container.containerID();
Set<ContainerReplica> replicas = new HashSet<>();
ContainerReplica good = createContainerReplica(cid, 0,
IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
@@ -108,7 +113,7 @@ public class TestReplicationManagerUtil {
});
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
- ReplicationManagerUtil.getExcludedAndUsedNodes(
+ ReplicationManagerUtil.getExcludedAndUsedNodes(container,
new ArrayList<>(replicas), toBeRemoved, pending,
replicationManager);
@@ -131,4 +136,89 @@ public class TestReplicationManagerUtil {
.contains(pendingDelete));
}
+ @Test
+ public void testGetUsedAndExcludedNodesForQuasiClosedContainer() throws
NodeNotFoundException {
+ ContainerInfo container =
createContainer(HddsProtos.LifeCycleState.QUASI_CLOSED,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+ ContainerID cid = container.containerID();
+ Set<ContainerReplica> replicas = new HashSet<>();
+ ContainerReplica good = createContainerReplica(cid, 0, IN_SERVICE,
+ ContainerReplicaProto.State.QUASI_CLOSED, 1);
+ replicas.add(good);
+
+ ContainerReplica remove = createContainerReplica(cid, 0,
+ IN_SERVICE, ContainerReplicaProto.State.QUASI_CLOSED, 1);
+ replicas.add(remove);
+ Set<ContainerReplica> toBeRemoved = new HashSet<>();
+ toBeRemoved.add(remove);
+
+ // this replica should be on the used nodes list
+ ContainerReplica unhealthyWithUniqueOrigin = createContainerReplica(
+ cid, 0, IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, 1);
+ replicas.add(unhealthyWithUniqueOrigin);
+
+ // this one should be on the excluded nodes list
+ ContainerReplica unhealthyWithNonUniqueOrigin =
createContainerReplica(cid, 0, IN_SERVICE,
+ ContainerReplicaProto.State.UNHEALTHY, container.getNumberOfKeys(),
container.getUsedBytes(),
+ MockDatanodeDetails.randomDatanodeDetails(),
good.getOriginDatanodeId());
+ replicas.add(unhealthyWithNonUniqueOrigin);
+
+ ContainerReplica decommissioning =
+ createContainerReplica(cid, 0,
+ DECOMMISSIONING, ContainerReplicaProto.State.QUASI_CLOSED, 1);
+ replicas.add(decommissioning);
+
+ ContainerReplica maintenance =
+ createContainerReplica(cid, 0,
+ IN_MAINTENANCE, ContainerReplicaProto.State.QUASI_CLOSED, 1);
+ replicas.add(maintenance);
+
+ // Finally, add a pending add and delete. The add should go onto the used
+ // list and the delete added to the excluded nodes.
+ DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails pendingDelete =
MockDatanodeDetails.randomDatanodeDetails();
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+ pending.add(ContainerReplicaOp.create(
+ ContainerReplicaOp.PendingOpType.DELETE, pendingDelete, 0));
+
+ Mockito.when(replicationManager.getNodeStatus(Mockito.any())).thenAnswer(
+ invocation -> {
+ final DatanodeDetails dn = invocation.getArgument(0);
+ for (ContainerReplica r : replicas) {
+ if (r.getDatanodeDetails().equals(dn)) {
+ return new NodeStatus(
+ r.getDatanodeDetails().getPersistedOpState(),
+ HddsProtos.NodeState.HEALTHY);
+ }
+ }
+ throw new NodeNotFoundException(dn.getUuidString());
+ });
+
+ ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+ ReplicationManagerUtil.getExcludedAndUsedNodes(container,
+ new ArrayList<>(replicas), toBeRemoved, pending,
+ replicationManager);
+
+ assertEquals(4, excludedAndUsedNodes.getUsedNodes().size());
+ assertTrue(excludedAndUsedNodes.getUsedNodes()
+ .contains(good.getDatanodeDetails()));
+ assertTrue(excludedAndUsedNodes.getUsedNodes()
+ .contains(maintenance.getDatanodeDetails()));
+ assertTrue(excludedAndUsedNodes.getUsedNodes()
+ .contains(pendingAdd));
+
assertTrue(excludedAndUsedNodes.getUsedNodes().contains(unhealthyWithUniqueOrigin.getDatanodeDetails()));
+
+ assertEquals(4, excludedAndUsedNodes.getExcludedNodes().size());
+ assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(unhealthyWithNonUniqueOrigin.getDatanodeDetails()));
+ assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(decommissioning.getDatanodeDetails()));
+ assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(remove.getDatanodeDetails()));
+ assertTrue(excludedAndUsedNodes.getExcludedNodes()
+ .contains(pendingDelete));
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java
new file mode 100644
index 0000000000..72a89f0286
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link VulnerableUnhealthyReplicasHandler}.
+ */
+public class TestVulnerableUnhealthyReplicasHandler {
+ private ReplicationManager replicationManager;
+ private ReplicationConfig repConfig;
+ private ReplicationQueue repQueue;
+ private ContainerCheckRequest.Builder requestBuilder;
+ private ReplicationManagerReport report;
+ private VulnerableUnhealthyReplicasHandler handler;
+
+ @BeforeEach
+ public void setup() throws NodeNotFoundException {
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ handler = new VulnerableUnhealthyReplicasHandler(replicationManager);
+ repConfig = RatisReplicationConfig.getInstance(THREE);
+ repQueue = new ReplicationQueue();
+ report = new ReplicationManagerReport();
+ requestBuilder = new ContainerCheckRequest.Builder()
+ .setReplicationQueue(repQueue)
+ .setMaintenanceRedundancy(2)
+ .setPendingOps(Collections.emptyList())
+ .setReport(report);
+
+
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+ .thenReturn(NodeStatus.inServiceHealthy());
+ }
+
+ @Test
+ public void testReturnsFalseForECContainer() {
+ ContainerInfo container = createContainerInfo(new ECReplicationConfig(3,
2));
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
1, 2, 3, 4);
+ requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+ assertFalse(handler.handle(requestBuilder.build()));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void testReturnsFalseForClosedContainer() {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
LifeCycleState.CLOSED);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
0, 0, 0);
+ requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+ assertFalse(handler.handle(requestBuilder.build()));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void testReturnsFalseForQuasiClosedContainerWithNoUnhealthyReplicas()
{
+ ContainerInfo container = createContainerInfo(repConfig, 1,
LifeCycleState.QUASI_CLOSED);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
State.QUASI_CLOSED, 0, 0, 0);
+ requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+ assertFalse(handler.handle(requestBuilder.build()));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void
testReturnsFalseForQuasiClosedContainerWithNoVulnerableReplicas() {
+ ContainerInfo container = createContainerInfo(repConfig, 1,
LifeCycleState.QUASI_CLOSED);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
0, 0, 0);
+ // create UNHEALTHY replica with unique origin id on an IN_SERVICE node
+ replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.UNHEALTHY));
+ requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+ assertFalse(handler.handle(requestBuilder.build()));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplica()
throws NodeNotFoundException {
+ long sequenceId = 10;
+ ContainerInfo container = createContainerInfo(repConfig, 1,
LifeCycleState.QUASI_CLOSED, sequenceId);
+ Set<ContainerReplica> replicas = new HashSet<>(4);
+ for (int i = 0; i < 3; i++) {
+ replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED,
+ container.getSequenceId() - 1));
+ }
+ // create UNHEALTHY replica with unique origin id on a DECOMMISSIONING node
+ ContainerReplica unhealthy =
+ createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
State.UNHEALTHY, sequenceId);
+ replicas.add(unhealthy);
+
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+ .thenAnswer(invocation -> {
+ DatanodeDetails dn = invocation.getArgument(0);
+ if (dn.equals(unhealthy.getDatanodeDetails())) {
+ return new NodeStatus(DECOMMISSIONING, HEALTHY);
+ }
+ return NodeStatus.inServiceHealthy();
+ });
+ requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+ assertTrue(handler.handle(requestBuilder.build()));
+ assertEquals(1, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void testReturnsFalseForVulnerableReplicaWithAnotherCopy() throws
NodeNotFoundException {
+ long sequenceId = 10;
+ ContainerInfo container = createContainerInfo(repConfig, 1,
LifeCycleState.QUASI_CLOSED, sequenceId);
+ Set<ContainerReplica> replicas = new HashSet<>(4);
+ for (int i = 0; i < 3; i++) {
+ replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED,
+ container.getSequenceId() - 1));
+ }
+ // create UNHEALTHY replica with a non-unique origin id on a
DECOMMISSIONING node
+ ContainerReplica unhealthy =
+ createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
State.UNHEALTHY, sequenceId);
+ replicas.add(unhealthy);
+
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+ .thenAnswer(invocation -> {
+ DatanodeDetails dn = invocation.getArgument(0);
+ if (dn.equals(unhealthy.getDatanodeDetails())) {
+ return new NodeStatus(DECOMMISSIONING, HEALTHY);
+ }
+ return NodeStatus.inServiceHealthy();
+ });
+ replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.UNHEALTHY,
+ container.getNumberOfKeys(), container.getUsedBytes(),
MockDatanodeDetails.randomDatanodeDetails(),
+ unhealthy.getOriginDatanodeId(), container.getSequenceId()));
+ requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+ assertFalse(handler.handle(requestBuilder.build()));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void testDoesNotEnqueueForReadOnlyRequest() throws
NodeNotFoundException {
+ long sequenceId = 10;
+ ContainerInfo container = createContainerInfo(repConfig, 1,
LifeCycleState.QUASI_CLOSED, sequenceId);
+ Set<ContainerReplica> replicas = new HashSet<>(4);
+ for (int i = 0; i < 3; i++) {
+ replicas.add(createContainerReplica(container.containerID(), 0,
IN_SERVICE, State.QUASI_CLOSED,
+ container.getSequenceId() - 1));
+ }
+ // create UNHEALTHY replica with unique origin id on a DECOMMISSIONING node
+ ContainerReplica unhealthy =
+ createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
State.UNHEALTHY, sequenceId);
+ replicas.add(unhealthy);
+
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+ .thenAnswer(invocation -> {
+ DatanodeDetails dn = invocation.getArgument(0);
+ if (dn.equals(unhealthy.getDatanodeDetails())) {
+ return new NodeStatus(DECOMMISSIONING, HEALTHY);
+ }
+ return NodeStatus.inServiceHealthy();
+ });
+ requestBuilder.setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setReadOnly(true);
+
+ assertTrue(handler.handle(requestBuilder.build()));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
index 4433c0cb6f..4ff937f98c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
@@ -193,7 +193,7 @@ public final class DatanodeAdminMonitorTestUtil {
mockCheckContainerState(repManager, underReplicated);
}
- private static void mockCheckContainerState(ReplicationManager repManager,
boolean underReplicated)
+ static void mockCheckContainerState(ReplicationManager repManager, boolean
underReplicated)
throws ContainerNotFoundException {
Mockito.when(repManager.checkContainerStatus(Mockito.any(ContainerInfo.class),
Mockito.any(ReplicationManagerReport.class)))
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 4b389fbcf2..17107cfa95 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.replication.LegacyRatisContainerReplicaCount;
+import
org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
@@ -298,6 +299,80 @@ public class TestDatanodeAdminMonitor {
nodeManager.getNodeStatus(dn1).getOperationalState());
}
+ /**
+ * Situation: A QUASI_CLOSED container has an UNHEALTHY replica with the
+ * greatest BCSID, and three QUASI_CLOSED replicas with a smaller BCSID. The
+ * UNHEALTHY container is on a decommissioning node, and there are no other
+ * copies of this replica, that is, replicas with the same Origin ID as
+ * this replica.
+ *
+ * Expectation: Decommissioning should not complete until the UNHEALTHY
+ * replica has been replicated to another node.
+ */
+ @Test
+ public void testDecommissionWaitsForUnhealthyReplicaToReplicateNewRM()
+ throws NodeNotFoundException, ContainerNotFoundException {
+ DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+ nodeManager.register(dn1,
+ new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+ HddsProtos.NodeState.HEALTHY));
+
+ // create 3 QUASI_CLOSED replicas with containerID 1 and same origin ID
+ ContainerID containerID = ContainerID.valueOf(1);
+ Set<ContainerReplica> replicas =
+ ReplicationTestUtil.createReplicasWithSameOrigin(containerID,
+ State.QUASI_CLOSED, 0, 0, 0);
+
+ // the container's sequence id is greater than the healthy replicas'
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.THREE), containerID.getId(),
+ HddsProtos.LifeCycleState.QUASI_CLOSED,
+ replicas.iterator().next().getSequenceId() + 1);
+ // UNHEALTHY replica is on a unique origin and has same sequence id as
+ // the container
+ ContainerReplica unhealthy =
+ ReplicationTestUtil.createContainerReplica(containerID, 0,
+ dn1.getPersistedOpState(), State.UNHEALTHY,
+ container.getNumberOfKeys(), container.getUsedBytes(), dn1,
+ dn1.getUuid(), container.getSequenceId());
+ replicas.add(unhealthy);
+ nodeManager.setContainers(dn1, ImmutableSet.of(containerID));
+
+ Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID)))
+ .thenReturn(new RatisContainerReplicaCount(container, replicas,
+ Collections.emptyList(), 2, false));
+ DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, true);
+
+ // start monitoring dn1
+ monitor.startMonitoring(dn1);
+ monitor.run();
+ assertEquals(1, monitor.getTrackedNodeCount());
+ assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+ nodeManager.getNodeStatus(dn1).getOperationalState());
+
+ // Running the monitor again causes it to remain DECOMMISSIONING
+ // as nothing has changed.
+ monitor.run();
+ assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+ nodeManager.getNodeStatus(dn1).getOperationalState());
+
+ // add a copy of the UNHEALTHY replica on a new node, dn1 should get
+ // decommissioned now
+ ContainerReplica copyOfUnhealthyOnNewNode = unhealthy.toBuilder()
+ .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails())
+ .build();
+ replicas.add(copyOfUnhealthyOnNewNode);
+ Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID)))
+ .thenReturn(new RatisContainerReplicaCount(container, replicas,
+ Collections.emptyList(), 2, false));
+ DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, false);
+ monitor.run();
+ assertEquals(0, monitor.getTrackedNodeCount());
+ assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONED,
+ nodeManager.getNodeStatus(dn1).getOperationalState());
+ }
+
/**
* Consider a QUASI_CLOSED container with only UNHEALTHY replicas. If one
* of its nodes is decommissioned, the decommissioning should succeed.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]