This is an automated email from the ASF dual-hosted git repository.

siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new faa19906f6 HDDS-9592. Replication Manager: Save UNHEALTHY replicas 
with highest BCSID for a QUASI_CLOSED container (#5794)
faa19906f6 is described below

commit faa19906f664f3a68ccd3b1b9d6347dced279605
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Wed Dec 20 11:54:06 2023 +0530

    HDDS-9592. Replication Manager: Save UNHEALTHY replicas with highest BCSID 
for a QUASI_CLOSED container (#5794)
---
 .../replication/ContainerHealthResult.java         |   9 +
 .../replication/ECUnderReplicationHandler.java     |   2 +-
 .../LegacyRatisContainerReplicaCount.java          |   9 +-
 .../replication/LegacyReplicationManager.java      |  10 +-
 .../replication/MisReplicationHandler.java         |   2 +-
 .../replication/RatisContainerReplicaCount.java    |  64 ++++--
 .../replication/RatisUnderReplicationHandler.java  | 109 ++++++++++-
 .../container/replication/ReplicationManager.java  |   4 +-
 .../replication/ReplicationManagerUtil.java        |  78 +++++---
 .../health/VulnerableUnhealthyReplicasHandler.java | 102 ++++++++++
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |  16 +-
 .../TestRatisUnderReplicationHandler.java          |  70 +++++++
 .../replication/TestReplicationManager.java        |  59 ++++++
 .../replication/TestReplicationManagerUtil.java    |  94 ++++++++-
 .../TestVulnerableUnhealthyReplicasHandler.java    | 217 +++++++++++++++++++++
 .../scm/node/DatanodeAdminMonitorTestUtil.java     |   2 +-
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |  75 +++++++
 17 files changed, 864 insertions(+), 58 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index a2262cdafd..0abe8f6ea3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -113,6 +113,7 @@ public class ContainerHealthResult {
     private boolean hasUnReplicatedOfflineIndexes = false;
     private boolean offlineIndexesOkAfterPending = false;
     private int requeueCount = 0;
+    private boolean hasVulnerableUnhealthy = false;
 
     public UnderReplicatedHealthResult(ContainerInfo containerInfo,
         int remainingRedundancy, boolean dueToOutOfService,
@@ -269,6 +270,14 @@ public class ContainerHealthResult {
       return isMissing;
     }
 
+    public void setHasVulnerableUnhealthy(boolean hasVulnerableUnhealthy) {
+      this.hasVulnerableUnhealthy = hasVulnerableUnhealthy;
+    }
+
+    public boolean hasVulnerableUnhealthy() {
+      return hasVulnerableUnhealthy;
+    }
+
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder("UnderReplicatedHealthResult{")
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index daae24f7f2..07d38c05da 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -128,7 +128,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
         container.containerID(), replicas);
 
     ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
-        ReplicationManagerUtil.getExcludedAndUsedNodes(
+        ReplicationManagerUtil.getExcludedAndUsedNodes(container,
             new ArrayList<>(replicas), Collections.emptySet(), pendingOps,
             replicationManager);
     List<DatanodeDetails> excludedNodes
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
index f708ae1ead..f491e2bd6f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyRatisContainerReplicaCount.java
@@ -22,6 +22,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 
 import java.util.List;
 import java.util.Set;
@@ -130,6 +131,12 @@ public class LegacyRatisContainerReplicaCount extends
   public boolean isSufficientlyReplicatedForOffline(DatanodeDetails datanode,
       NodeManager nodeManager) {
     return super.isSufficientlyReplicated() &&
-        super.getVulnerableUnhealthyReplicas(nodeManager).isEmpty();
+        super.getVulnerableUnhealthyReplicas(dn -> {
+          try {
+            return nodeManager.getNodeStatus(dn);
+          } catch (NodeNotFoundException e) {
+            return null;
+          }
+        }).isEmpty();
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 07a8f730ec..04862e0d31 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -558,7 +558,15 @@ public class LegacyReplicationManager {
          * match the container's Sequence ID.
          */
         List<ContainerReplica> vulnerableUnhealthy =
-            replicaSet.getVulnerableUnhealthyReplicas(nodeManager);
+            replicaSet.getVulnerableUnhealthyReplicas(dn -> {
+              try {
+                return nodeManager.getNodeStatus(dn);
+              } catch (NodeNotFoundException e) {
+                LOG.warn("Exception for datanode {} while getting vulnerable 
replicas for container {}, with all " +
+                    "replicas {}.", dn, container, replicas, e);
+                return null;
+              }
+            });
         if (!vulnerableUnhealthy.isEmpty()) {
           report.incrementAndSample(HealthState.UNDER_REPLICATED,
               container.containerID());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 70b2a44427..636b0e9589 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -148,7 +148,7 @@ public abstract class MisReplicationHandler implements
             .collect(Collectors.toMap(Function.identity(), 
sources::contains)));
 
     ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes
-        = ReplicationManagerUtil.getExcludedAndUsedNodes(
+        = ReplicationManagerUtil.getExcludedAndUsedNodes(container,
             new ArrayList(replicas), replicasToBeReplicated,
             Collections.emptyList(), replicationManager);
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
index bec3b1090e..d23934184e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
@@ -423,9 +424,48 @@ public class RatisContainerReplicaCount implements 
ContainerReplicaCount {
     return isSufficientlyReplicated();
   }
 
+  /**
+   * Checks if all replicas (except UNHEALTHY) on in-service nodes are in the
+   * same health state as the container. This is similar to what
+   * {@link ContainerReplicaCount#isHealthy()} does. The difference is in how
+   * both methods treat UNHEALTHY replicas.
+   * <p>
+   * This method is the interface between the decommissioning flow and
+   * Replication Manager. Callers can use it to check whether replicas of a
+   * container are in the same state as the container before a datanode is
+   * taken offline.
+   * </p>
+   * <p>
+   * Note that this method's purpose is to only compare the replica state with
+   * the container state. It does not check if the container has sufficient
+   * number of replicas - that is the job of {@link ContainerReplicaCount
+   * #isSufficientlyReplicatedForOffline(DatanodeDetails, NodeManager)}.
+   * @return true if the container is healthy enough, which is determined by
+   * various checks
+   * </p>
+   */
   @Override
   public boolean isHealthyEnoughForOffline() {
-    return isHealthy();
+    long countInService = getReplicas().stream()
+        .filter(r -> r.getDatanodeDetails().getPersistedOpState() == 
IN_SERVICE)
+        .count();
+    if (countInService == 0) {
+      /*
+      Having no in-service nodes is unexpected and SCM shouldn't allow this
+      to happen in the first place. Return false here just to be safe.
+      */
+      return false;
+    }
+
+    HddsProtos.LifeCycleState containerState = getContainer().getState();
+    return (containerState == HddsProtos.LifeCycleState.CLOSED
+        || containerState == HddsProtos.LifeCycleState.QUASI_CLOSED)
+        && getReplicas().stream()
+        .filter(r -> r.getDatanodeDetails().getPersistedOpState() == 
IN_SERVICE)
+        .filter(r -> r.getState() !=
+            ContainerReplicaProto.State.UNHEALTHY)
+        .allMatch(r -> ReplicationManager.compareState(
+            containerState, r.getState()));
   }
 
   /**
@@ -435,14 +475,14 @@ public class RatisContainerReplicaCount implements 
ContainerReplicaCount {
    * to save at least one copy of each such UNHEALTHY replica. This method
    * finds such UNHEALTHY replicas.
    *
-   * @param nodeManager an instance of NodeManager
+   * @param nodeStatusFn a function used to check the {@link NodeStatus} of a 
node,
+   * accepting a {@link DatanodeDetails} and returning {@link NodeStatus}
    * @return List of UNHEALTHY replicas with the greatest Sequence ID that
    * need to be replicated to other nodes. Empty list if this container is not
    * QUASI_CLOSED, doesn't have a mix of healthy and UNHEALTHY replicas, or
    * if there are no replicas that need to be saved.
    */
-  List<ContainerReplica> getVulnerableUnhealthyReplicas(
-      NodeManager nodeManager) {
+  public List<ContainerReplica> 
getVulnerableUnhealthyReplicas(Function<DatanodeDetails, NodeStatus> 
nodeStatusFn) {
     if (container.getState() != HddsProtos.LifeCycleState.QUASI_CLOSED) {
       // this method is only relevant for QUASI_CLOSED containers
       return Collections.emptyList();
@@ -456,7 +496,7 @@ public class RatisContainerReplicaCount implements 
ContainerReplicaCount {
       }
 
       if (replica.getSequenceId() == container.getSequenceId()) {
-        if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+        if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY && 
!replica.isEmpty()) {
           unhealthyReplicas.add(replica);
         } else if (replica.getState() ==
             ContainerReplicaProto.State.QUASI_CLOSED) {
@@ -474,20 +514,16 @@ public class RatisContainerReplicaCount implements 
ContainerReplicaCount {
 
     unhealthyReplicas.removeIf(
         replica -> {
-          try {
-            return !nodeManager.getNodeStatus(replica.getDatanodeDetails())
-                .isHealthy();
-          } catch (NodeNotFoundException e) {
-            return true;
-          }
+          NodeStatus status = nodeStatusFn.apply(replica.getDatanodeDetails());
+          return status == null || !status.isHealthy();
         });
     /*
-    At this point, the list of unhealthyReplicas contains all UNHEALTHY
+    At this point, the list of unhealthyReplicas contains all UNHEALTHY 
non-empty
     replicas with the greatest Sequence ID that are on healthy Datanodes.
     Note that this also includes multiple copies of the same UNHEALTHY
     replica, that is, replicas with the same Origin ID. We need to consider
     the fact that replicas can be uniquely unhealthy. That is, 2 UNHEALTHY
-    replicas will difference Origin ID need not be exact copies of each other.
+    replicas with different Origin ID need not be exact copies of each other.
 
     Replicas that don't have at least one instance (multiple instances of a
     replica will have the same Origin ID) on an IN_SERVICE node are
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 98c19d16ff..4a823fb8ee 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -98,6 +99,14 @@ public class RatisUnderReplicationHandler
         new RatisContainerReplicaCount(containerInfo, replicas, pendingOps,
             minHealthyForMaintenance, false);
 
+    if (result instanceof ContainerHealthResult.UnderReplicatedHealthResult) {
+      ContainerHealthResult.UnderReplicatedHealthResult
+          underReplicatedResult = 
(ContainerHealthResult.UnderReplicatedHealthResult) result;
+      if (underReplicatedResult.hasVulnerableUnhealthy()) {
+        return handleVulnerableUnhealthyReplicas(withUnhealthy, pendingOps);
+      }
+    }
+
     // verify that this container is still under replicated and we don't have
     // sufficient replication after considering pending adds
     RatisContainerReplicaCount replicaCount =
@@ -151,6 +160,104 @@ public class RatisUnderReplicationHandler
     return commandsSent;
   }
 
+  /**
+   * Sends a replicate command for each replica specified in
+   * vulnerableUnhealthy.
+   * @param replicaCount RatisContainerReplicaCount for this container
+   * @param pendingOps List of pending ops
+   * @return number of replicate commands sent
+   */
+  private int handleVulnerableUnhealthyReplicas(RatisContainerReplicaCount 
replicaCount,
+      List<ContainerReplicaOp> pendingOps) throws NotLeaderException, 
CommandTargetOverloadedException, SCMException {
+    ContainerInfo container = replicaCount.getContainer();
+    List<ContainerReplica> vulnerableUnhealthy = 
replicaCount.getVulnerableUnhealthyReplicas(dn -> {
+      try {
+        return replicationManager.getNodeStatus(dn);
+      } catch (NodeNotFoundException e) {
+        LOG.warn("Exception for datanode {} while handling vulnerable replicas 
for container {}, with all replicas" +
+            " {}.", dn, container, replicaCount.getReplicas(), e);
+        return null;
+      }
+    });
+    LOG.info("Handling vulnerable UNHEALTHY replicas {} for container {}.", 
vulnerableUnhealthy, container);
+
+    int pendingAdds = 0;
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        pendingAdds++;
+      }
+    }
+    if (pendingAdds >= vulnerableUnhealthy.size()) {
+      LOG.debug("There are {} pending adds for container {}, while the number 
of UNHEALTHY replicas is {}.",
+          pendingAdds, container.containerID(), vulnerableUnhealthy.size());
+      return 0;
+    }
+
+    /*
+    Since we're replicating UNHEALTHY replicas, it's possible that replication 
keeps on failing. Shuffling gives
+    other replicas a chance to be replicated since there's a limit on 
in-flight adds.
+    */
+    Collections.shuffle(vulnerableUnhealthy);
+    return replicateEachSource(replicaCount, vulnerableUnhealthy, pendingOps);
+  }
+
+  /**
+   * Replicates each of the ContainerReplica specified in sources to new
+   * Datanodes. Will not consider Datanodes hosting existing replicas and
+   * Datanodes pending adds as targets. Note that this method simply skips
+   * a replica if its datanode is overloaded with commands, throwing an
+   * exception once all sources have been looked at.
+   * @param replicaCount RatisContainerReplicaCount for this container
+   * @param sources List containing replicas, each will be replicated
+   */
+  private int replicateEachSource(RatisContainerReplicaCount replicaCount, 
List<ContainerReplica> sources,
+      List<ContainerReplicaOp> pendingOps) throws NotLeaderException, 
SCMException, CommandTargetOverloadedException {
+    List<ContainerReplica> allReplicas = replicaCount.getReplicas();
+    ContainerInfo container = replicaCount.getContainer();
+
+    /*
+    We use the placement policy to get a target Datanode to which a vulnerable 
replica will be replicated. In
+    placement policy terms, a 'used node' is a Datanode which has a legit 
replica of this container. An 'excluded
+    node' is a Datanode that should not be considered to host a replica of 
this container, but other Datanodes in this
+    Datanode's rack are available. So, Datanodes of any vulnerable replicas 
should be excluded nodes while Datanodes
+    of other replicas, including UNHEALTHY replicas that are not pending 
delete (because they have unique origin),
+    should be used nodes.
+    */
+    ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+        ReplicationManagerUtil.getExcludedAndUsedNodes(container, allReplicas, 
Collections.emptySet(), pendingOps,
+            replicationManager);
+
+    CommandTargetOverloadedException firstException = null;
+    int numCommandsSent = 0;
+    for (ContainerReplica replica : sources) {
+      // find a target for each source and send replicate command
+      final List<DatanodeDetails> target =
+          ReplicationManagerUtil.getTargetDatanodes(placementPolicy, 1, 
excludedAndUsedNodes.getUsedNodes(),
+              excludedAndUsedNodes.getExcludedNodes(), currentContainerSize, 
container);
+      int count = 0;
+      try {
+        count = sendReplicationCommands(container, 
ImmutableList.of(replica.getDatanodeDetails()), target);
+      } catch (CommandTargetOverloadedException e) {
+        LOG.info("Exception while replicating {} to target {} for container 
{}.", replica, target, container, e);
+        if (firstException == null) {
+          firstException = e;
+        }
+      }
+
+      if (count == 1) {
+        // a command was sent to target, so it needs to be in the used nodes 
list because it's pending an add
+        excludedAndUsedNodes.getUsedNodes().add(target.get(0));
+      }
+      numCommandsSent += count;
+    }
+
+    if (firstException != null) {
+      throw firstException;
+    }
+
+    return numCommandsSent;
+  }
+
   private void removeUnhealthyReplicaIfPossible(ContainerInfo containerInfo,
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps)
       throws NotLeaderException {
@@ -337,7 +444,7 @@ public class RatisUnderReplicationHandler
         replicaCount.getContainer().containerID(), replicaCount.getReplicas());
 
     ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
-        ReplicationManagerUtil.getExcludedAndUsedNodes(
+        
ReplicationManagerUtil.getExcludedAndUsedNodes(replicaCount.getContainer(),
             replicaCount.getReplicas(), Collections.emptySet(), pendingOps,
             replicationManager);
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 3b9f66595f..979cff799f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -54,6 +54,7 @@ import 
org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHand
 import 
org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedContainerHandler;
 import 
org.apache.hadoop.hdds.scm.container.replication.health.RatisReplicationCheckHandler;
 import 
org.apache.hadoop.hdds.scm.container.replication.health.RatisUnhealthyReplicationCheckHandler;
+import 
org.apache.hadoop.hdds.scm.container.replication.health.VulnerableUnhealthyReplicasHandler;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -279,7 +280,8 @@ public class ReplicationManager implements SCMService {
         .addNext(ratisReplicationCheckHandler)
         .addNext(new ClosedWithUnhealthyReplicasHandler(this))
         .addNext(ecMisReplicationCheckHandler)
-        .addNext(new RatisUnhealthyReplicationCheckHandler());
+        .addNext(new RatisUnhealthyReplicationCheckHandler())
+        .addNext(new VulnerableUnhealthyReplicasHandler(this));
     start();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index 076a81e69b..3dcd6aa23b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -116,6 +117,7 @@ public final class ReplicationManagerUtil {
    * @return ExcludedAndUsedNodes object containing the excluded and used lists
    */
   public static ExcludedAndUsedNodes getExcludedAndUsedNodes(
+      ContainerInfo container,
       List<ContainerReplica> replicas,
       Set<ContainerReplica> toBeRemoved,
       List<ContainerReplicaOp> pendingReplicaOps,
@@ -123,12 +125,37 @@ public final class ReplicationManagerUtil {
     List<DatanodeDetails> excludedNodes = new ArrayList<>();
     List<DatanodeDetails> usedNodes = new ArrayList<>();
 
+    List<ContainerReplica> nonUniqueUnhealthy = null;
+    if (container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      /*
+      An UNHEALTHY replica with unique origin node id of a QUASI_CLOSED 
container should be a used node (not excluded
+      node) because we preserve it. The following code will find non-unique 
UNHEALTHY replicas. Later in the method
+      this list will be used to determine whether an UNHEALTHY replica's DN 
should be a used node or excluded node.
+      */
+      nonUniqueUnhealthy =
+          selectUnhealthyReplicasForDelete(container, new HashSet<>(replicas), 
0, dn -> {
+            try {
+              return replicationManager.getNodeStatus(dn);
+            } catch (NodeNotFoundException e) {
+              LOG.warn("Exception for {} while selecting used and excluded 
nodes for container {}.", dn, container);
+              return null;
+            }
+          });
+    }
     for (ContainerReplica r : replicas) {
       if (r.getState() == ContainerReplicaProto.State.UNHEALTHY) {
-        // Hosts with an Unhealthy replica cannot receive a new replica, but
-        // they are not considered used as they will be removed later.
-        excludedNodes.add(r.getDatanodeDetails());
-        continue;
+        if (container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+          // any unique UNHEALTHY will get added as used nodes in the 
catch-all at the end of the loop
+          if (nonUniqueUnhealthy != null && nonUniqueUnhealthy.contains(r)) {
+            excludedNodes.add(r.getDatanodeDetails());
+            continue;
+          }
+        } else {
+          // Hosts with an UNHEALTHY replica (of a non QUASI_CLOSED container) 
cannot receive a new replica, but
+          // they are not considered used as they will be removed later.
+          excludedNodes.add(r.getDatanodeDetails());
+          continue;
+        }
       }
       if (toBeRemoved.contains(r)) {
         // This node is currently present, but we plan to remove it so it is 
not
@@ -195,22 +222,8 @@ public final class ReplicationManagerUtil {
     }
   }
 
-  /**
-   * This is intended to be call when a container is under replicated, but 
there
-   * are no spare nodes to create new replicas on, due to having too many
-   * unhealthy replicas or quasi-closed replicas which cannot be closed due to
-   * having a lagging sequence ID. The logic here will select a replica to
-   * delete, or return null if there are none which can be safely deleted.
-   *
-   * @param containerInfo The container to select a replica to delete from
-   * @param replicas The list of replicas for the container
-   * @param pendingDeletes number pending deletes for this container
-   * @return A replica to delete, or null if there are none which can be safely
-   *         deleted.
-   */
-  public static ContainerReplica selectUnhealthyReplicaForDelete(
-      ContainerInfo containerInfo, Set<ContainerReplica> replicas,
-      int pendingDeletes, Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
+  public static List<ContainerReplica> 
selectUnhealthyReplicasForDelete(ContainerInfo containerInfo,
+      Set<ContainerReplica> replicas, int pendingDeletes, 
Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
     if (pendingDeletes > 0) {
       LOG.debug("Container {} has {} pending deletes which will free nodes.",
           containerInfo, pendingDeletes);
@@ -261,18 +274,39 @@ public final class ReplicationManagerUtil {
     deleteCandidates.sort(
         Comparator.comparingLong(ContainerReplica::getSequenceId));
     if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
-      return deleteCandidates.size() > 0 ? deleteCandidates.get(0) : null;
+      return deleteCandidates.size() > 0 ? deleteCandidates : null;
     }
 
     if (containerInfo.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
       List<ContainerReplica> nonUniqueOrigins =
           findNonUniqueDeleteCandidates(replicas, deleteCandidates,
               nodeStatusFn);
-      return nonUniqueOrigins.size() > 0 ? nonUniqueOrigins.get(0) : null;
+      return nonUniqueOrigins.size() > 0 ? nonUniqueOrigins : null;
     }
     return null;
   }
 
+  /**
+   * This is intended to be called when a container is under replicated, but 
there
+   * are no spare nodes to create new replicas on, due to having too many
+   * unhealthy replicas or quasi-closed replicas which cannot be closed due to
+   * having a lagging sequence ID. The logic here will select a replica to
+   * delete, or return null if there are none which can be safely deleted.
+   *
+   * @param containerInfo The container to select a replica to delete from
+   * @param replicas The list of replicas for the container
+   * @param pendingDeletes number pending deletes for this container
+   * @return A replica to delete, or null if there are none which can be safely
+   *         deleted.
+   */
+  public static ContainerReplica selectUnhealthyReplicaForDelete(
+      ContainerInfo containerInfo, Set<ContainerReplica> replicas,
+      int pendingDeletes, Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
+    List<ContainerReplica> containerReplicas =
+        selectUnhealthyReplicasForDelete(containerInfo, replicas, 
pendingDeletes, nodeStatusFn);
+    return containerReplicas != null ? containerReplicas.get(0) : null;
+  }
+
   /**
    * Given a list of all replicas (including deleteCandidates), finds and
    * returns replicas which don't have unique origin node IDs. This method
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java
new file mode 100644
index 0000000000..21b2d8151d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import 
org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+
+/**
+ * A QUASI_CLOSED container may have some UNHEALTHY replicas with the
+ * same Sequence ID as the container. RM should try to maintain one
+ * copy of such replicas when there are no healthy replicas that
+ * match the container's Sequence ID.
+ */
+public class VulnerableUnhealthyReplicasHandler extends AbstractCheck {
+  public static final Logger LOG = 
LoggerFactory.getLogger(VulnerableUnhealthyReplicasHandler.class);
+  private final ReplicationManager replicationManager;
+
+  public VulnerableUnhealthyReplicasHandler(ReplicationManager 
replicationManager) {
+    this.replicationManager = replicationManager;
+  }
+
+  /**
+   * Checks if the container is QUASI_CLOSED has some vulnerable UNHEALTHY 
replicas that need to replicated to
+   * other Datanodes. These replicas have the same sequence ID as the 
container while other healthy replicas don't.
+   * If the node hosting such a replica is being taken offline, then the 
replica may have to be replicated to another
+   * node.
+   * @param request ContainerCheckRequest object representing the container
+   * @return true if some vulnerable UNHEALTHY replicas were found, else false
+   */
+  @Override
+  public boolean handle(ContainerCheckRequest request) {
+    ContainerInfo container = request.getContainerInfo();
+    if (container.getReplicationType() != RATIS) {
+      // This handler is only for Ratis containers.
+      return false;
+    }
+    if (container.getState() != HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      return false;
+    }
+    Set<ContainerReplica> replicas = request.getContainerReplicas();
+    LOG.debug("Checking whether container {} with replicas {} has vulnerable 
UNHEALTHY replicas.", container, replicas);
+    RatisContainerReplicaCount replicaCount =
+        new RatisContainerReplicaCount(container, replicas, 
request.getPendingOps(), request.getMaintenanceRedundancy(),
+            true);
+
+    List<ContainerReplica> vulnerableUnhealthy = 
replicaCount.getVulnerableUnhealthyReplicas(dn -> {
+      try {
+        return replicationManager.getNodeStatus(dn);
+      } catch (NodeNotFoundException e) {
+        LOG.warn("Exception for datanode {} while handling vulnerable replicas 
for container {}, with all replicas" +
+            " {}.", dn, container, replicaCount.getReplicas(), e);
+        return null;
+      }
+    });
+
+    if (!vulnerableUnhealthy.isEmpty()) {
+      LOG.info("Found vulnerable UNHEALTHY replicas {} for container {}.", 
vulnerableUnhealthy, container);
+      ReplicationManagerReport report = request.getReport();
+      
report.incrementAndSample(ReplicationManagerReport.HealthState.UNDER_REPLICATED,
 container.containerID());
+      if (!request.isReadOnly()) {
+        ContainerHealthResult.UnderReplicatedHealthResult underRepResult =
+            replicaCount.toUnderHealthResult();
+        underRepResult.setHasVulnerableUnhealthy(true);
+        request.getReplicationQueue().enqueue(underRepResult);
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index 455307c6be..a7423a79dc 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -362,19 +362,7 @@ public class DatanodeAdminMonitorImpl implements 
DatanodeAdminMonitor {
           continue;
         }
 
-        boolean isHealthy;
-        /*
-        If LegacyReplicationManager is enabled, then use the
-        isHealthyEnoughForOffline API. ReplicationManager doesn't support this
-        API yet.
-         */
-        boolean legacyEnabled = conf.getBoolean("hdds.scm.replication.enable" +
-            ".legacy", false);
-        if (legacyEnabled) {
-          isHealthy = replicaSet.isHealthyEnoughForOffline();
-        } else {
-          isHealthy = replicaSet.isHealthy();
-        }
+        boolean isHealthy = replicaSet.isHealthyEnoughForOffline();
         if (!isHealthy) {
           if (LOG.isDebugEnabled()) {
             unClosedIDs.add(cid);
@@ -391,6 +379,8 @@ public class DatanodeAdminMonitorImpl implements 
DatanodeAdminMonitor {
         // state, except for any which are unhealthy. As the container is 
closed, we can check
         // if it is sufficiently replicated using replicationManager, but this 
only works if the
         // legacy RM is not enabled.
+        boolean legacyEnabled = conf.getBoolean("hdds.scm.replication.enable" +
+            ".legacy", false);
         boolean replicatedOK;
         if (legacyEnabled) {
           replicatedOK = replicaSet.isSufficientlyReplicatedForOffline(dn, 
nodeManager);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index 17548bc5fe..dd7747e127 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -551,6 +551,76 @@ public class TestRatisUnderReplicationHandler {
             command.getKey()));
   }
 
+  /**
+   * A QUASI_CLOSED container may end up having UNHEALTHY replicas with the 
correct sequence ID, while none of the
+   * healthy replicas have the correct sequence ID. If any of these UNHEALTHY 
replicas is unique and is being taken
+   * offline, then it needs to be replicated to another DN for decommission to 
progress. This test asserts that a
+   * replicate command is sent for one such replica.
+   */
+  @Test
+  public void testUnderReplicationWithVulnerableReplicas() throws IOException {
+    final long sequenceID = 20;
+    container = 
ReplicationTestUtil.createContainerInfo(RATIS_REPLICATION_CONFIG, 1,
+        HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID);
+
+    final Set<ContainerReplica> replicas = new HashSet<>(4);
+    for (int i = 0; i < 3; i++) {
+      replicas.add(createContainerReplica(container.containerID(), 0, 
IN_SERVICE, State.QUASI_CLOSED,
+          sequenceID - 1));
+    }
+    final ContainerReplica unhealthyReplica = 
createContainerReplica(container.containerID(), 0,
+            DECOMMISSIONING, State.UNHEALTHY, sequenceID);
+    replicas.add(unhealthyReplica);
+    UnderReplicatedHealthResult result = getUnderReplicatedHealthResult();
+    Mockito.when(result.hasVulnerableUnhealthy()).thenReturn(true);
+
+    final Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = 
testProcessing(replicas, Collections.emptyList(),
+        result, 2, 1);
+    assertEquals(unhealthyReplica.getDatanodeDetails(), 
commands.iterator().next().getKey());
+  }
+
+  /**
+   * In the push replication model, a replicate command is sent to the DN 
hosting the replica, and that DN is
+   * expected to "push" the replica to another DN. If the DN hosting the 
replica has too many commands already, an
+   * exception is thrown. This test asserts that other vulnerable UNHEALTHY 
replicas are still handled when an
+   * exception is caught for one of the replicas. Also asserts that the first 
thrown exception isn't lost and is
+   * actually rethrown once other replicas are processed, so that the 
container can be re-queued.
+   */
+  @Test
+  public void 
testUnderReplicationWithVulnerableReplicasAndTargetOverloadedException()
+      throws NotLeaderException, CommandTargetOverloadedException {
+    final long sequenceID = 20;
+    container = 
ReplicationTestUtil.createContainerInfo(RATIS_REPLICATION_CONFIG, 1,
+        HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID);
+
+    final Set<ContainerReplica> replicas = new HashSet<>(5);
+    for (int i = 0; i < 3; i++) {
+      replicas.add(createContainerReplica(container.containerID(), 0, 
IN_SERVICE, State.QUASI_CLOSED,
+          sequenceID - 1));
+    }
+
+    /*
+    Create 2 unhealthy vulnerable replicas. An exception is thrown for one of 
the replicas, but the other replica
+    should still be processed and 1 command should be sent.
+     */
+    final ContainerReplica unhealthyReplica = 
createContainerReplica(container.containerID(), 0,
+        DECOMMISSIONING, State.UNHEALTHY, sequenceID);
+    final ContainerReplica unhealthyReplica2 = 
createContainerReplica(container.containerID(), 0,
+        ENTERING_MAINTENANCE, State.UNHEALTHY, sequenceID);
+    replicas.add(unhealthyReplica);
+    replicas.add(unhealthyReplica2);
+    UnderReplicatedHealthResult result = getUnderReplicatedHealthResult();
+    Mockito.when(result.hasVulnerableUnhealthy()).thenReturn(true);
+    ReplicationTestUtil.mockRMSendThrottleReplicateCommand(replicationManager, 
commandsSent, new AtomicBoolean(true));
+
+    RatisUnderReplicationHandler handler = new 
RatisUnderReplicationHandler(policy, conf, replicationManager);
+    assertThrows(CommandTargetOverloadedException.class, () -> 
handler.processAndSendCommands(replicas,
+        Collections.emptyList(), result, 2));
+    assertEquals(1, commandsSent.size());
+    DatanodeDetails dn = commandsSent.iterator().next().getKey();
+    assertTrue(unhealthyReplica.getDatanodeDetails().equals(dn) || 
unhealthyReplica2.getDatanodeDetails().equals(dn));
+  }
+
   @Test
   public void testOnlyQuasiClosedReplicaWithWrongSequenceIdIsAvailable()
       throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index a909377879..32463a5a6e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -91,6 +92,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 
 /**
@@ -444,6 +446,63 @@ public class TestReplicationManager {
     assertEquals(0, repQueue.overReplicatedQueueSize());
   }
 
+  @Test
+  public void testQuasiClosedContainerWithVulnerableUnhealthyReplica()
+      throws IOException, NodeNotFoundException {
+    RatisReplicationConfig ratisRepConfig =
+        RatisReplicationConfig.getInstance(THREE);
+    long sequenceID = 10;
+    ContainerInfo container = createContainerInfo(ratisRepConfig, 1,
+        HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID);
+
+    // this method creates replicas with same origin id and zero sequence id
+    Set<ContainerReplica> replicas =
+        createReplicasWithSameOrigin(container.containerID(),
+            ContainerReplicaProto.State.QUASI_CLOSED, 0, 0, 0);
+    replicas.add(createContainerReplica(container.containerID(), 0,
+        IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, sequenceID));
+    ContainerReplica decommissioning =
+        createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
+            ContainerReplicaProto.State.UNHEALTHY, sequenceID);
+    replicas.add(decommissioning);
+    storeContainerAndReplicas(container, replicas);
+    Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dn = invocation.getArgument(0);
+          if (dn.equals(decommissioning.getDatanodeDetails())) {
+            return new NodeStatus(DECOMMISSIONING, 
HddsProtos.NodeState.HEALTHY);
+          }
+
+          return NodeStatus.inServiceHealthy();
+        });
+
+    replicationManager.processContainer(container, repQueue, repReport);
+    assertEquals(1, repReport.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+    assertEquals(1, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+
+    Mockito.when(ratisPlacementPolicy.chooseDatanodes(anyList(), anyList(), 
eq(null), eq(1), anyLong(),
+        anyLong())).thenAnswer(invocation -> 
ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()));
+    
Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(DatanodeDetails.class),
 any(), any()))
+        .thenAnswer(invocation -> {
+          Map<SCMCommandProto.Type, Integer> map = new HashMap<>();
+          map.put(SCMCommandProto.Type.replicateContainerCommand, 0);
+          map.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+          return map;
+        });
+    RatisUnderReplicationHandler handler =
+        new RatisUnderReplicationHandler(ratisPlacementPolicy, configuration, 
replicationManager);
+
+    handler.processAndSendCommands(replicas, Collections.emptyList(), 
repQueue.dequeueUnderReplicatedContainer(), 2);
+    assertEquals(1, commandsSent.size());
+    Pair<UUID, SCMCommand<?>> command = commandsSent.iterator().next();
+    assertEquals(SCMCommandProto.Type.replicateContainerCommand, 
command.getValue().getType());
+    assertEquals(decommissioning.getDatanodeDetails().getUuid(), 
command.getKey());
+  }
+
   /**
    * When there is Quasi Closed Replica with incorrect sequence id
    * for a Closed container, it's treated as unhealthy and deleted.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
index 3b81db7767..c68130e79e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerUtil.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -37,6 +39,7 @@ import java.util.Set;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -55,7 +58,9 @@ public class TestReplicationManagerUtil {
 
   @Test
   public void testGetExcludedAndUsedNodes() throws NodeNotFoundException {
-    ContainerID cid = ContainerID.valueOf(1L);
+    ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+    ContainerID cid = container.containerID();
     Set<ContainerReplica> replicas = new HashSet<>();
     ContainerReplica good = createContainerReplica(cid, 0,
         IN_SERVICE, ContainerReplicaProto.State.CLOSED, 1);
@@ -108,7 +113,7 @@ public class TestReplicationManagerUtil {
         });
 
     ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
-        ReplicationManagerUtil.getExcludedAndUsedNodes(
+        ReplicationManagerUtil.getExcludedAndUsedNodes(container,
             new ArrayList<>(replicas), toBeRemoved, pending,
             replicationManager);
 
@@ -131,4 +136,89 @@ public class TestReplicationManagerUtil {
         .contains(pendingDelete));
   }
 
+  @Test
+  public void testGetUsedAndExcludedNodesForQuasiClosedContainer() throws 
NodeNotFoundException {
+    ContainerInfo container = 
createContainer(HddsProtos.LifeCycleState.QUASI_CLOSED,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+    ContainerID cid = container.containerID();
+    Set<ContainerReplica> replicas = new HashSet<>();
+    ContainerReplica good = createContainerReplica(cid, 0, IN_SERVICE,
+        ContainerReplicaProto.State.QUASI_CLOSED, 1);
+    replicas.add(good);
+
+    ContainerReplica remove = createContainerReplica(cid, 0,
+        IN_SERVICE, ContainerReplicaProto.State.QUASI_CLOSED, 1);
+    replicas.add(remove);
+    Set<ContainerReplica> toBeRemoved = new HashSet<>();
+    toBeRemoved.add(remove);
+
+    // this replica should be on the used nodes list
+    ContainerReplica unhealthyWithUniqueOrigin = createContainerReplica(
+        cid, 0, IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, 1);
+    replicas.add(unhealthyWithUniqueOrigin);
+
+    // this one should be on the excluded nodes list
+    ContainerReplica unhealthyWithNonUniqueOrigin = 
createContainerReplica(cid, 0, IN_SERVICE,
+        ContainerReplicaProto.State.UNHEALTHY, container.getNumberOfKeys(), 
container.getUsedBytes(),
+        MockDatanodeDetails.randomDatanodeDetails(), 
good.getOriginDatanodeId());
+    replicas.add(unhealthyWithNonUniqueOrigin);
+
+    ContainerReplica decommissioning =
+        createContainerReplica(cid, 0,
+            DECOMMISSIONING, ContainerReplicaProto.State.QUASI_CLOSED, 1);
+    replicas.add(decommissioning);
+
+    ContainerReplica maintenance =
+        createContainerReplica(cid, 0,
+            IN_MAINTENANCE, ContainerReplicaProto.State.QUASI_CLOSED, 1);
+    replicas.add(maintenance);
+
+    // Finally, add a pending add and delete. The add should go onto the used
+    // list and the delete added to the excluded nodes.
+    DatanodeDetails pendingAdd = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails pendingDelete = 
MockDatanodeDetails.randomDatanodeDetails();
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.ADD, pendingAdd, 0));
+    pending.add(ContainerReplicaOp.create(
+        ContainerReplicaOp.PendingOpType.DELETE, pendingDelete, 0));
+
+    Mockito.when(replicationManager.getNodeStatus(Mockito.any())).thenAnswer(
+        invocation -> {
+          final DatanodeDetails dn = invocation.getArgument(0);
+          for (ContainerReplica r : replicas) {
+            if (r.getDatanodeDetails().equals(dn)) {
+              return new NodeStatus(
+                  r.getDatanodeDetails().getPersistedOpState(),
+                  HddsProtos.NodeState.HEALTHY);
+            }
+          }
+          throw new NodeNotFoundException(dn.getUuidString());
+        });
+
+    ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
+        ReplicationManagerUtil.getExcludedAndUsedNodes(container,
+            new ArrayList<>(replicas), toBeRemoved, pending,
+            replicationManager);
+
+    assertEquals(4, excludedAndUsedNodes.getUsedNodes().size());
+    assertTrue(excludedAndUsedNodes.getUsedNodes()
+        .contains(good.getDatanodeDetails()));
+    assertTrue(excludedAndUsedNodes.getUsedNodes()
+        .contains(maintenance.getDatanodeDetails()));
+    assertTrue(excludedAndUsedNodes.getUsedNodes()
+        .contains(pendingAdd));
+    
assertTrue(excludedAndUsedNodes.getUsedNodes().contains(unhealthyWithUniqueOrigin.getDatanodeDetails()));
+
+    assertEquals(4, excludedAndUsedNodes.getExcludedNodes().size());
+    assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(unhealthyWithNonUniqueOrigin.getDatanodeDetails()));
+    assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(decommissioning.getDatanodeDetails()));
+    assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(remove.getDatanodeDetails()));
+    assertTrue(excludedAndUsedNodes.getExcludedNodes()
+        .contains(pendingDelete));
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java
new file mode 100644
index 0000000000..72a89f0286
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link VulnerableUnhealthyReplicasHandler}.
+ */
+public class TestVulnerableUnhealthyReplicasHandler {
+  private ReplicationManager replicationManager;
+  private ReplicationConfig repConfig;
+  private ReplicationQueue repQueue;
+  private ContainerCheckRequest.Builder requestBuilder;
+  private ReplicationManagerReport report;
+  private VulnerableUnhealthyReplicasHandler handler;
+
+  @BeforeEach
+  public void setup() throws NodeNotFoundException {
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    handler = new VulnerableUnhealthyReplicasHandler(replicationManager);
+    repConfig = RatisReplicationConfig.getInstance(THREE);
+    repQueue = new ReplicationQueue();
+    report = new ReplicationManagerReport();
+    requestBuilder = new ContainerCheckRequest.Builder()
+        .setReplicationQueue(repQueue)
+        .setMaintenanceRedundancy(2)
+        .setPendingOps(Collections.emptyList())
+        .setReport(report);
+
+    
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+        .thenReturn(NodeStatus.inServiceHealthy());
+  }
+
+  @Test
+  public void testReturnsFalseForECContainer() {
+    ContainerInfo container = createContainerInfo(new ECReplicationConfig(3, 
2));
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(), 
1, 2, 3, 4);
+    requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+    assertFalse(handler.handle(requestBuilder.build()));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void testReturnsFalseForClosedContainer() {
+    ContainerInfo container = createContainerInfo(repConfig, 1, 
LifeCycleState.CLOSED);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(), 
0, 0, 0);
+    requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+    assertFalse(handler.handle(requestBuilder.build()));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void testReturnsFalseForQuasiClosedContainerWithNoUnhealthyReplicas() 
{
+    ContainerInfo container = createContainerInfo(repConfig, 1, 
LifeCycleState.QUASI_CLOSED);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(), 
State.QUASI_CLOSED, 0, 0, 0);
+    requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+    assertFalse(handler.handle(requestBuilder.build()));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void 
testReturnsFalseForQuasiClosedContainerWithNoVulnerableReplicas() {
+    ContainerInfo container = createContainerInfo(repConfig, 1, 
LifeCycleState.QUASI_CLOSED);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(), 
0, 0, 0);
+    // create UNHEALTHY replica with unique origin id on an IN_SERVICE node
+    replicas.add(createContainerReplica(container.containerID(), 0, 
IN_SERVICE, State.UNHEALTHY));
+    requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+    assertFalse(handler.handle(requestBuilder.build()));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplica() 
throws NodeNotFoundException {
+    long sequenceId = 10;
+    ContainerInfo container = createContainerInfo(repConfig, 1, 
LifeCycleState.QUASI_CLOSED, sequenceId);
+    Set<ContainerReplica> replicas = new HashSet<>(4);
+    for (int i = 0; i < 3; i++) {
+      replicas.add(createContainerReplica(container.containerID(), 0, 
IN_SERVICE, State.QUASI_CLOSED,
+          container.getSequenceId() - 1));
+    }
+    // create UNHEALTHY replica with unique origin id on a DECOMMISSIONING node
+    ContainerReplica unhealthy =
+        createContainerReplica(container.containerID(), 0, DECOMMISSIONING, 
State.UNHEALTHY, sequenceId);
+    replicas.add(unhealthy);
+    
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dn = invocation.getArgument(0);
+          if (dn.equals(unhealthy.getDatanodeDetails())) {
+            return new NodeStatus(DECOMMISSIONING, HEALTHY);
+          }
+          return NodeStatus.inServiceHealthy();
+        });
+    requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+    assertTrue(handler.handle(requestBuilder.build()));
+    assertEquals(1, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void testReturnsFalseForVulnerableReplicaWithAnotherCopy() throws 
NodeNotFoundException {
+    long sequenceId = 10;
+    ContainerInfo container = createContainerInfo(repConfig, 1, 
LifeCycleState.QUASI_CLOSED, sequenceId);
+    Set<ContainerReplica> replicas = new HashSet<>(4);
+    for (int i = 0; i < 3; i++) {
+      replicas.add(createContainerReplica(container.containerID(), 0, 
IN_SERVICE, State.QUASI_CLOSED,
+          container.getSequenceId() - 1));
+    }
+    // create UNHEALTHY replica with a non-unique origin id on a 
DECOMMISSIONING node
+    ContainerReplica unhealthy =
+        createContainerReplica(container.containerID(), 0, DECOMMISSIONING, 
State.UNHEALTHY, sequenceId);
+    replicas.add(unhealthy);
+    
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dn = invocation.getArgument(0);
+          if (dn.equals(unhealthy.getDatanodeDetails())) {
+            return new NodeStatus(DECOMMISSIONING, HEALTHY);
+          }
+          return NodeStatus.inServiceHealthy();
+        });
+    replicas.add(createContainerReplica(container.containerID(), 0, 
IN_SERVICE, State.UNHEALTHY,
+        container.getNumberOfKeys(), container.getUsedBytes(), 
MockDatanodeDetails.randomDatanodeDetails(),
+        unhealthy.getOriginDatanodeId(), container.getSequenceId()));
+    requestBuilder.setContainerReplicas(replicas).setContainerInfo(container);
+
+    assertFalse(handler.handle(requestBuilder.build()));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void testDoesNotEnqueueForReadOnlyRequest() throws 
NodeNotFoundException {
+    long sequenceId = 10;
+    ContainerInfo container = createContainerInfo(repConfig, 1, 
LifeCycleState.QUASI_CLOSED, sequenceId);
+    Set<ContainerReplica> replicas = new HashSet<>(4);
+    for (int i = 0; i < 3; i++) {
+      replicas.add(createContainerReplica(container.containerID(), 0, 
IN_SERVICE, State.QUASI_CLOSED,
+          container.getSequenceId() - 1));
+    }
+    // create UNHEALTHY replica with unique origin id on a DECOMMISSIONING node
+    ContainerReplica unhealthy =
+        createContainerReplica(container.containerID(), 0, DECOMMISSIONING, 
State.UNHEALTHY, sequenceId);
+    replicas.add(unhealthy);
+    
Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dn = invocation.getArgument(0);
+          if (dn.equals(unhealthy.getDatanodeDetails())) {
+            return new NodeStatus(DECOMMISSIONING, HEALTHY);
+          }
+          return NodeStatus.inServiceHealthy();
+        });
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setReadOnly(true);
+
+    assertTrue(handler.handle(requestBuilder.build()));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
index 4433c0cb6f..4ff937f98c 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java
@@ -193,7 +193,7 @@ public final class DatanodeAdminMonitorTestUtil {
     mockCheckContainerState(repManager, underReplicated);
   }
 
-  private static void mockCheckContainerState(ReplicationManager repManager, 
boolean underReplicated)
+  static void mockCheckContainerState(ReplicationManager repManager, boolean 
underReplicated)
       throws ContainerNotFoundException {
     
Mockito.when(repManager.checkContainerStatus(Mockito.any(ContainerInfo.class),
             Mockito.any(ReplicationManagerReport.class)))
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 4b389fbcf2..17107cfa95 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.replication.LegacyRatisContainerReplicaCount;
+import 
org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
@@ -298,6 +299,80 @@ public class TestDatanodeAdminMonitor {
         nodeManager.getNodeStatus(dn1).getOperationalState());
   }
 
+  /**
+   * Situation: A QUASI_CLOSED container has an UNHEALTHY replica with the
+   * greatest BCSID, and three QUASI_CLOSED replicas with a smaller BCSID. The
+   * UNHEALTHY container is on a decommissioning node, and there are no other
+   * copies of this replica, that is, replicas with the same Origin ID as
+   * this replica.
+   *
+   * Expectation: Decommissioning should not complete until the UNHEALTHY
+   * replica has been replicated to another node.
+   */
+  @Test
+  public void testDecommissionWaitsForUnhealthyReplicaToReplicateNewRM()
+      throws NodeNotFoundException, ContainerNotFoundException {
+    DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+    nodeManager.register(dn1,
+        new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+            HddsProtos.NodeState.HEALTHY));
+
+    // create 3 QUASI_CLOSED replicas with containerID 1 and same origin ID
+    ContainerID containerID = ContainerID.valueOf(1);
+    Set<ContainerReplica> replicas =
+        ReplicationTestUtil.createReplicasWithSameOrigin(containerID,
+            State.QUASI_CLOSED, 0, 0, 0);
+
+    // the container's sequence id is greater than the healthy replicas'
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        RatisReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.THREE), containerID.getId(),
+        HddsProtos.LifeCycleState.QUASI_CLOSED,
+        replicas.iterator().next().getSequenceId() + 1);
+    // UNHEALTHY replica is on a unique origin and has same sequence id as
+    // the container
+    ContainerReplica unhealthy =
+        ReplicationTestUtil.createContainerReplica(containerID, 0,
+            dn1.getPersistedOpState(), State.UNHEALTHY,
+            container.getNumberOfKeys(), container.getUsedBytes(), dn1,
+            dn1.getUuid(), container.getSequenceId());
+    replicas.add(unhealthy);
+    nodeManager.setContainers(dn1, ImmutableSet.of(containerID));
+
+    Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID)))
+        .thenReturn(new RatisContainerReplicaCount(container, replicas,
+            Collections.emptyList(), 2, false));
+    DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, true);
+
+    // start monitoring dn1
+    monitor.startMonitoring(dn1);
+    monitor.run();
+    assertEquals(1, monitor.getTrackedNodeCount());
+    assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        nodeManager.getNodeStatus(dn1).getOperationalState());
+
+    // Running the monitor again causes it to remain DECOMMISSIONING
+    // as nothing has changed.
+    monitor.run();
+    assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        nodeManager.getNodeStatus(dn1).getOperationalState());
+
+    // add a copy of the UNHEALTHY replica on a new node, dn1 should get
+    // decommissioned now
+    ContainerReplica copyOfUnhealthyOnNewNode = unhealthy.toBuilder()
+        .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails())
+        .build();
+    replicas.add(copyOfUnhealthyOnNewNode);
+    Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID)))
+        .thenReturn(new RatisContainerReplicaCount(container, replicas,
+            Collections.emptyList(), 2, false));
+    DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, false);
+    monitor.run();
+    assertEquals(0, monitor.getTrackedNodeCount());
+    assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONED,
+        nodeManager.getNodeStatus(dn1).getOperationalState());
+  }
+
   /**
    * Consider a QUASI_CLOSED container with only UNHEALTHY replicas. If one
    * of its nodes is decommissioned, the decommissioning should succeed.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to