This is an automated email from the ASF dual-hosted git repository.

siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fa480d014e HDDS-9177. A mis replicated EC container with UNHEALTHY 
replicas may not get resolved (#5218)
fa480d014e is described below

commit fa480d014ec260b8fe82e52e3f3bde3138d304b4
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Fri Aug 25 16:04:43 2023 +0530

    HDDS-9177. A mis replicated EC container with UNHEALTHY replicas may not 
get resolved (#5218)
---
 .../replication/MisReplicationHandler.java         |   2 +
 .../container/replication/ReplicationManager.java  |   8 +-
 .../health/ClosedWithUnhealthyReplicasHandler.java |   2 +-
 .../health/ECMisReplicationCheckHandler.java       | 142 +++++++++++
 .../health/ECReplicationCheckHandler.java          |  68 +-----
 .../replication/TestReplicationManager.java        |  99 ++++++++
 .../health/TestECMisReplicationCheckHandler.java   | 265 +++++++++++++++++++++
 .../health/TestECReplicationCheckHandler.java      |  84 +------
 8 files changed, 520 insertions(+), 150 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 6dcec13c50..70b2a44427 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -140,6 +140,8 @@ public abstract class MisReplicationHandler implements
               container.getContainerID());
       return 0;
     }
+
+    LOG.debug("Handling mis replicated container {}.", container);
     Set<ContainerReplica> sources = filterSources(replicas);
     Set<ContainerReplica> replicasToBeReplicated = containerPlacement
             .replicasToCopyToFixMisreplication(replicas.stream()
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 4974407f36..f6d7f82ed9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 import org.apache.hadoop.hdds.scm.container.balancer.MoveManager;
+import 
org.apache.hadoop.hdds.scm.container.replication.health.ECMisReplicationCheckHandler;
 import 
org.apache.hadoop.hdds.scm.container.replication.health.MismatchedReplicasHandler;
 import 
org.apache.hadoop.hdds.scm.container.replication.health.ClosedWithUnhealthyReplicasHandler;
 import 
org.apache.hadoop.hdds.scm.container.replication.health.ClosingContainerHandler;
@@ -181,6 +182,7 @@ public class ReplicationManager implements SCMService {
   private final Clock clock;
   private final ContainerReplicaPendingOps containerReplicaPendingOps;
   private final ECReplicationCheckHandler ecReplicationCheckHandler;
+  private final ECMisReplicationCheckHandler ecMisReplicationCheckHandler;
   private final RatisReplicationCheckHandler ratisReplicationCheckHandler;
   private final EventPublisher eventPublisher;
   private final AtomicReference<ReplicationQueue> replicationQueue
@@ -238,8 +240,9 @@ public class ReplicationManager implements SCMService {
         TimeUnit.MILLISECONDS);
     this.containerReplicaPendingOps = replicaPendingOps;
     this.legacyReplicationManager = legacyReplicationManager;
-    this.ecReplicationCheckHandler =
-        new ECReplicationCheckHandler(ecContainerPlacement);
+    this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
+    this.ecMisReplicationCheckHandler =
+        new ECMisReplicationCheckHandler(ecContainerPlacement);
     this.ratisReplicationCheckHandler =
         new RatisReplicationCheckHandler(ratisContainerPlacement);
     this.nodeManager = nodeManager;
@@ -274,6 +277,7 @@ public class ReplicationManager implements SCMService {
         .addNext(ecReplicationCheckHandler)
         .addNext(ratisReplicationCheckHandler)
         .addNext(new ClosedWithUnhealthyReplicasHandler(this))
+        .addNext(ecMisReplicationCheckHandler)
         .addNext(new RatisUnhealthyReplicationCheckHandler());
     start();
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
index 0ab60dfae2..b5dd9ed3c3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
@@ -50,7 +50,7 @@ public class ClosedWithUnhealthyReplicasHandler extends 
AbstractCheck {
 
   /**
    * Handles a closed EC container with unhealthy replicas. Note that if we
-   * reach here, there is no over, under, or mis replication. This handler
+   * reach here, there is no over or under replication. This handler
    * will just send commands to delete the unhealthy replicas.
    *
    * <p>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECMisReplicationCheckHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECMisReplicationCheckHandler.java
new file mode 100644
index 0000000000..e42eb6b93f
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECMisReplicationCheckHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+
+/**
+ * This class checks if an EC container is mis replicated. The container should
+ * not be over or under replicated, and should not have an excess of UNHEALTHY
+ * replicas.
+ */
+public class ECMisReplicationCheckHandler extends AbstractCheck {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ECMisReplicationCheckHandler.class);
+
+  private final PlacementPolicy placementPolicy;
+
+  public ECMisReplicationCheckHandler(PlacementPolicy placementPolicy) {
+    this.placementPolicy = placementPolicy;
+  }
+
+  @Override
+  public boolean handle(ContainerCheckRequest request) {
+    if (request.getContainerInfo().getReplicationType() != EC) {
+      // This handler is only for EC containers.
+      return false;
+    }
+
+    ReplicationManagerReport report = request.getReport();
+    ContainerInfo container = request.getContainerInfo();
+    ContainerID containerID = container.containerID();
+    LOG.debug("Checking container {} for mis replication.", container);
+
+    ContainerHealthResult health = checkMisReplication(request);
+    if (health.getHealthState() ==
+        ContainerHealthResult.HealthState.MIS_REPLICATED) {
+      report.incrementAndSample(
+          ReplicationManagerReport.HealthState.MIS_REPLICATED, containerID);
+      ContainerHealthResult.MisReplicatedHealthResult misRepHealth
+          = ((ContainerHealthResult.MisReplicatedHealthResult) health);
+      if (!misRepHealth.isReplicatedOkAfterPending()) {
+        request.getReplicationQueue().enqueue(misRepHealth);
+      }
+      LOG.debug("Container {} is Mis Replicated. isReplicatedOkAfterPending "
+              + "is [{}]. Reason for mis replication is [{}].", container,
+          misRepHealth.isReplicatedOkAfterPending(),
+          misRepHealth.getMisReplicatedReason());
+      return true;
+    }
+
+    return false;
+  }
+
+  ContainerHealthResult checkMisReplication(ContainerCheckRequest request) {
+    ContainerInfo container = request.getContainerInfo();
+    Set<ContainerReplica> replicas = request.getContainerReplicas();
+
+    ContainerPlacementStatus placement = getPlacementStatus(replicas,
+        container.getReplicationConfig().getRequiredNodes(),
+        Collections.emptyList());
+    if (!placement.isPolicySatisfied()) {
+      /*
+      Check if policy is satisfied by considering pending ops. For example if
+      there are some pending adds that can fix mis replication then we don't
+      want to queue this container.
+       */
+      ContainerPlacementStatus placementAfterPending = getPlacementStatus(
+          replicas, container.getReplicationConfig().getRequiredNodes(),
+          request.getPendingOps());
+      return new ContainerHealthResult.MisReplicatedHealthResult(
+          container, placementAfterPending.isPolicySatisfied(),
+          placementAfterPending.misReplicatedReason());
+    }
+
+    return new ContainerHealthResult.HealthyResult(container);
+  }
+
+  /**
+   * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
+   * and then check if the list meets the container placement policy.
+   * @param replicas List of containerReplica
+   * @param replicationFactor Expected Replication Factor of the container
+   * @return ContainerPlacementStatus indicating if the policy is met or not
+   */
+  private ContainerPlacementStatus getPlacementStatus(
+      Set<ContainerReplica> replicas, int replicationFactor,
+      List<ContainerReplicaOp> pendingOps) {
+
+    Set<DatanodeDetails> replicaDns = replicas
+        .stream()
+        .map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toSet());
+
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        replicaDns.add(op.getTarget());
+      }
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        replicaDns.remove(op.getTarget());
+      }
+    }
+
+    return placementPolicy.validateContainerPlacement(
+        new ArrayList<>(replicaDns), replicationFactor);
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
index af99e4b2d6..f4677f8215 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
@@ -17,9 +17,6 @@
 package org.apache.hadoop.hdds.scm.container.replication.health;
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -31,11 +28,8 @@ import 
org.apache.hadoop.hdds.scm.container.replication.ECContainerReplicaCount;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 
@@ -49,12 +43,6 @@ public class ECReplicationCheckHandler extends AbstractCheck 
{
   private static final Logger LOG =
       LoggerFactory.getLogger(ECReplicationCheckHandler.class);
 
-  private final PlacementPolicy placementPolicy;
-
-  public ECReplicationCheckHandler(PlacementPolicy placementPolicy) {
-    this.placementPolicy = placementPolicy;
-  }
-
   @Override
   public boolean handle(ContainerCheckRequest request) {
     if (request.getContainerInfo().getReplicationType() != EC) {
@@ -114,25 +102,12 @@ public class ECReplicationCheckHandler extends 
AbstractCheck {
       LOG.debug("Container {} is Over Replicated. isReplicatedOkAfterPending "
           + "is [{}]", container, overHealth.isReplicatedOkAfterPending());
       return true;
-    } else if (health.getHealthState() ==
-        ContainerHealthResult.HealthState.MIS_REPLICATED) {
-      report.incrementAndSample(
-          ReplicationManagerReport.HealthState.MIS_REPLICATED, containerID);
-      ContainerHealthResult.MisReplicatedHealthResult misRepHealth
-          = ((ContainerHealthResult.MisReplicatedHealthResult) health);
-      if (!misRepHealth.isReplicatedOkAfterPending()) {
-        request.getReplicationQueue().enqueue(misRepHealth);
-      }
-      LOG.debug("Container {} is Mis Replicated. isReplicatedOkAfterPending "
-              + "is [{}]. Reason for mis replication is [{}].", container,
-          misRepHealth.isReplicatedOkAfterPending(),
-          misRepHealth.getMisReplicatedReason());
-      return true;
     }
+
     // Should not get here, but in case it does the container is not healthy,
     // but is also not under or over replicated.
-    LOG.warn("Container {} is not healthy but is not under, over or "
-        + " mis-replicated. Should not happen.", container);
+    LOG.warn("Container {} is not healthy but is not under or over replicated" 
+
+        ". Should not happen.", container);
     return false;
   }
 
@@ -179,44 +154,9 @@ public class ECReplicationCheckHandler extends 
AbstractCheck {
           .OverReplicatedHealthResult(container, overRepIndexes.size(),
           !replicaCount.isOverReplicated(true));
     }
-    ContainerPlacementStatus placement = getPlacementStatus(replicas,
-        container.getReplicationConfig().getRequiredNodes(),
-        Collections.emptyList());
-    if (!placement.isPolicySatisfied()) {
-      ContainerPlacementStatus placementAfterPending = getPlacementStatus(
-          replicas, container.getReplicationConfig().getRequiredNodes(),
-          request.getPendingOps());
-      return new ContainerHealthResult.MisReplicatedHealthResult(
-          container, placementAfterPending.isPolicySatisfied(),
-          placementAfterPending.misReplicatedReason());
-    }
+
     // No issues detected, so return healthy.
     return new ContainerHealthResult.HealthyResult(container);
   }
 
-  /**
-   * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
-   * and then check if the list meets the container placement policy.
-   * @param replicas List of containerReplica
-   * @param replicationFactor Expected Replication Factor of the container
-   * @return ContainerPlacementStatus indicating if the policy is met or not
-   */
-  private ContainerPlacementStatus getPlacementStatus(
-      Set<ContainerReplica> replicas, int replicationFactor,
-      List<ContainerReplicaOp> pendingOps) {
-
-    Set<DatanodeDetails> replicaDns = replicas.stream()
-        .map(ContainerReplica::getDatanodeDetails)
-        .collect(Collectors.toSet());
-    for (ContainerReplicaOp op : pendingOps) {
-      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
-        replicaDns.add(op.getTarget());
-      }
-      if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
-        replicaDns.remove(op.getTarget());
-      }
-    }
-    return placementPolicy.validateContainerPlacement(
-        new ArrayList<>(replicaDns), replicationFactor);
-  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 8d142d761b..eddd813a04 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -47,6 +47,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Lists;
 import org.apache.ozone.test.TestClock;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.junit.Assert;
@@ -828,6 +829,104 @@ public class TestReplicationManager {
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
 
+  @Test
+  public void testMisReplicatedECContainer() throws IOException {
+    Mockito.when(ecPlacementPolicy.validateContainerPlacement(
+            anyList(), anyInt()))
+        .thenReturn(new ContainerPlacementStatusDefault(4, 5, 5));
+
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
+
+    replicationManager.processContainer(container, repQueue, repReport);
+    assertEquals(1, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+    assertEquals(1, repReport.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+    assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+  /**
+   * Consider an EC container with 5 closed and 1 unhealthy replica.
+   * Assume this container is also mis replicated because it's on
+   * insufficient racks. For such EC containers, RM should first delete the
+   * unhealthy replica and then solve mis replication.
+   */
+  @Test
+  public void testMisReplicatedECContainerWithUnhealthyReplica()
+      throws ContainerNotFoundException {
+    Mockito.when(ecPlacementPolicy.validateContainerPlacement(
+            anyList(), anyInt()))
+        .thenReturn(new ContainerPlacementStatusDefault(5, 5, 5, 1,
+            Lists.newArrayList(2, 1, 1, 1, 1)));
+
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    Set<ContainerReplica> replicas =
+        addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4,
+            5);
+    ContainerReplica unhealthyReplica =
+        createContainerReplica(container.containerID(), 1, IN_SERVICE,
+            ContainerReplicaProto.State.UNHEALTHY);
+    replicas.add(unhealthyReplica);
+    storeContainerAndReplicas(container, replicas);
+
+    replicationManager.processContainer(container, repQueue, repReport);
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    /*
+     Does not get queued as over replicated because a delete command is sent
+     directly for the unhealthy replica.
+     */
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+    assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+    assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    assertEquals(1, repReport.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+    List<ContainerReplicaOp> ops =
+        containerReplicaPendingOps.getPendingOps(container.containerID());
+    Mockito.verify(nodeManager).addDatanodeCommand(any(), any());
+    Assertions.assertEquals(1, ops.size());
+    Assertions.assertEquals(ContainerReplicaOp.PendingOpType.DELETE,
+        ops.get(0).getOpType());
+    Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(),
+        ops.get(0).getTarget());
+    Assertions.assertEquals(1, ops.get(0).getReplicaIndex());
+    Assertions.assertEquals(1, replicationManager.getMetrics()
+        .getEcDeletionCmdsSentTotal());
+    Assertions.assertEquals(0, replicationManager.getMetrics()
+        .getDeletionCmdsSentTotal());
+
+    /*
+    Now, remove the unhealthy replica. This leaves 5 replicas on 4 racks,
+    which is mis replication. RM should queue this container as mis
+    replicated now.
+     */
+    replicas.remove(unhealthyReplica);
+    containerReplicaPendingOps.completeDeleteReplica(container.containerID(),
+        unhealthyReplica.getDatanodeDetails(), 1);
+    Mockito.when(ecPlacementPolicy.validateContainerPlacement(
+            anyList(), anyInt()))
+        .thenReturn(new ContainerPlacementStatusDefault(4, 5, 5, 1,
+            Lists.newArrayList(2, 1, 1, 1)));
+
+    repReport = new ReplicationManagerReport();
+    replicationManager.processContainer(container, repQueue, repReport);
+    assertEquals(1, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+    assertEquals(1, repReport.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+    assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
   @Test
   public void testUnderReplicationQueuePopulated() {
     // Make it always return mis-replicated. Only a perfectly replicated
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECMisReplicationCheckHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECMisReplicationCheckHandler.java
new file mode 100644
index 0000000000..3585b76d9b
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECMisReplicationCheckHandler.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+
+/**
+ * Unit tests for {@link ECMisReplicationCheckHandler}.
+ */
+public class TestECMisReplicationCheckHandler {
+  private ECMisReplicationCheckHandler handler;
+  private ECReplicationConfig repConfig;
+  private ReplicationQueue repQueue;
+  private int maintenanceRedundancy = 2;
+  private ContainerCheckRequest.Builder requestBuilder;
+  private ReplicationManagerReport report;
+  private PlacementPolicy placementPolicy;
+
+  @Before
+  public void setup() {
+    placementPolicy = Mockito.mock(PlacementPolicy.class);
+    Mockito.when(placementPolicy.validateContainerPlacement(
+            anyList(), anyInt()))
+        .thenReturn(new ContainerPlacementStatusDefault(5, 5, 5));
+    handler = new ECMisReplicationCheckHandler(placementPolicy);
+    repConfig = new ECReplicationConfig(3, 2);
+    repQueue = new ReplicationQueue();
+    report = new ReplicationManagerReport();
+    requestBuilder = new ContainerCheckRequest.Builder()
+        .setReplicationQueue(repQueue)
+        .setMaintenanceRedundancy(maintenanceRedundancy)
+        .setPendingOps(Collections.emptyList())
+        .setReport(report);
+  }
+
+  @Test
+  public void shouldReturnFalseForHealthyContainer() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+
+    ContainerHealthResult result = handler.checkMisReplication(request);
+    assertEquals(ContainerHealthResult.HealthState.HEALTHY,
+        result.getHealthState());
+
+    assertFalse(handler.handle(request));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void shouldReturnFalseForNonECContainer() {
+    ContainerInfo container =
+        createContainerInfo(RatisReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.THREE));
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+
+    assertFalse(handler.handle(request));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void shouldHandleMisReplicatedContainer() {
+    ContainerInfo container = createContainerInfo(repConfig);
+
+    // Placement policy is always violated
+    Mockito.when(placementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation ->
+        new ContainerPlacementStatusDefault(4, 5, 9));
+
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+        Pair.of(IN_SERVICE, 5));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .build();
+
+    ContainerHealthResult result = handler.checkMisReplication(request);
+    Assertions.assertEquals(ContainerHealthResult.HealthState.MIS_REPLICATED,
+        result.getHealthState());
+
+    assertTrue(handler.handle(request));
+    assertEquals(1, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+    assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+    assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+  }
+
+  /**
+   * If mis replication is fixed by a pending ADD, then the container should
+   * not be queued.
+   */
+  @Test
+  public void shouldReturnFalseForMisReplicatedContainerFixedByPending() {
+    ContainerInfo container = createContainerInfo(repConfig);
+
+    Mockito.when(placementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation -> {
+      List<DatanodeDetails> dns = invocation.getArgument(0);
+      // If the number of DNs is 5 or less make it be mis-replicated
+      if (dns.size() <= 5) {
+        return new ContainerPlacementStatusDefault(4, 5, 9);
+      } else {
+        return new ContainerPlacementStatusDefault(5, 5, 9);
+      }
+    });
+
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 1));
+
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+        Pair.of(IN_SERVICE, 5));
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending)
+        .build();
+
+    ContainerHealthResult result = handler.checkMisReplication(request);
+    Assertions.assertEquals(ContainerHealthResult.HealthState.MIS_REPLICATED,
+        result.getHealthState());
+
+    assertTrue(handler.handle(request));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+    assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+    assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+  }
+
+  /**
+   * This tests what happens when there's a pending DELETE. Suppose there's a
+   * mis replicated container with an excess unhealthy replica. And, there's a
+   * pending delete scheduled for the unhealthy replica. Then this container
+   * should not be queued.
+   */
+  @Test
+  public void testMisReplicationWithUnhealthyReplica() {
+    ContainerInfo container = createContainerInfo(repConfig);
+
+    Mockito.when(placementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation -> {
+      List<DatanodeDetails> dns = invocation.getArgument(0);
+      // If the number of DNs is 6 or more make it mis-replicated
+      if (dns.size() > 5) {
+        return new ContainerPlacementStatusDefault(5, 6, 9);
+      } else {
+        return new ContainerPlacementStatusDefault(5, 5, 9);
+      }
+    });
+
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+        Pair.of(IN_SERVICE, 5));
+    ContainerReplica unhealthyReplica =
+        createContainerReplica(container.containerID(), 1, IN_SERVICE,
+            State.UNHEALTHY);
+    replicas.add(unhealthyReplica);
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        DELETE, unhealthyReplica.getDatanodeDetails(), 1));
+
+    ContainerCheckRequest request = requestBuilder
+        .setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending)
+        .build();
+
+    ContainerHealthResult result = handler.checkMisReplication(request);
+    Assertions.assertEquals(ContainerHealthResult.HealthState.MIS_REPLICATED,
+        result.getHealthState());
+
+    assertTrue(handler.handle(request));
+    assertEquals(0, repQueue.underReplicatedQueueSize());
+    assertEquals(0, repQueue.overReplicatedQueueSize());
+    assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+    assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
index a427393cf6..ccb239637a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
@@ -18,7 +18,6 @@ package 
org.apache.hadoop.hdds.scm.container.replication.health;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -82,7 +81,7 @@ public class TestECReplicationCheckHandler {
     Mockito.when(placementPolicy.validateContainerPlacement(
         anyList(), anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
-    healthCheck = new ECReplicationCheckHandler(placementPolicy);
+    healthCheck = new ECReplicationCheckHandler();
     repConfig = new ECReplicationConfig(3, 2);
     repQueue = new ReplicationQueue();
     report = new ReplicationManagerReport();
@@ -579,87 +578,6 @@ public class TestECReplicationCheckHandler {
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
 
-  @Test
-  public void testMisReplicatedContainer() {
-    ContainerInfo container = createContainerInfo(repConfig);
-
-    // Placement policy is always violated
-    Mockito.when(placementPolicy.validateContainerPlacement(
-        Mockito.any(),
-        Mockito.anyInt()
-    )).thenAnswer(invocation ->
-        new ContainerPlacementStatusDefault(4, 5, 9));
-
-    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
-        Pair.of(IN_SERVICE, 5));
-    ContainerCheckRequest request = requestBuilder
-        .setContainerReplicas(replicas)
-        .setContainerInfo(container)
-        .build();
-
-    ContainerHealthResult result = healthCheck.checkHealth(request);
-    assertEquals(HealthState.MIS_REPLICATED, result.getHealthState());
-
-    assertTrue(healthCheck.handle(request));
-    assertEquals(1, repQueue.underReplicatedQueueSize());
-    assertEquals(0, repQueue.overReplicatedQueueSize());
-    assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-    assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.OVER_REPLICATED));
-    assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.MIS_REPLICATED));
-  }
-
-  @Test
-  public void testMisReplicatedContainerFixedByPending() {
-    ContainerInfo container = createContainerInfo(repConfig);
-
-    Mockito.when(placementPolicy.validateContainerPlacement(
-        Mockito.any(),
-        Mockito.anyInt()
-    )).thenAnswer(invocation -> {
-      List<DatanodeDetails> dns = invocation.getArgument(0);
-      // If the number of DNs is 5 or less make it be mis-replicated
-      if (dns.size() <= 5) {
-        return new ContainerPlacementStatusDefault(4, 5, 9);
-      } else {
-        return new ContainerPlacementStatusDefault(5, 5, 9);
-      }
-    });
-
-    List<ContainerReplicaOp> pending = new ArrayList<>();
-    pending.add(ContainerReplicaOp.create(
-        ADD, MockDatanodeDetails.randomDatanodeDetails(), 1));
-
-    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
-        Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
-        Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
-        Pair.of(IN_SERVICE, 5));
-    ContainerCheckRequest request = requestBuilder
-        .setContainerReplicas(replicas)
-        .setContainerInfo(container)
-        .setPendingOps(pending)
-        .build();
-
-    ContainerHealthResult result = healthCheck.checkHealth(request);
-    assertEquals(HealthState.MIS_REPLICATED, result.getHealthState());
-
-    // Under-replicated takes precedence and the over-replication is ignored
-    // for now.
-    assertTrue(healthCheck.handle(request));
-    assertEquals(0, repQueue.underReplicatedQueueSize());
-    assertEquals(0, repQueue.overReplicatedQueueSize());
-    assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-    assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.OVER_REPLICATED));
-    assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.MIS_REPLICATED));
-  }
-
   @Test
   public void testUnderAndMisReplicatedContainer() {
     ContainerInfo container = createContainerInfo(repConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to