This is an automated email from the ASF dual-hosted git repository.
siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fa480d014e HDDS-9177. A mis replicated EC container with UNHEALTHY
replicas may not get resolved (#5218)
fa480d014e is described below
commit fa480d014ec260b8fe82e52e3f3bde3138d304b4
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Fri Aug 25 16:04:43 2023 +0530
HDDS-9177. A mis replicated EC container with UNHEALTHY replicas may not
get resolved (#5218)
---
.../replication/MisReplicationHandler.java | 2 +
.../container/replication/ReplicationManager.java | 8 +-
.../health/ClosedWithUnhealthyReplicasHandler.java | 2 +-
.../health/ECMisReplicationCheckHandler.java | 142 +++++++++++
.../health/ECReplicationCheckHandler.java | 68 +-----
.../replication/TestReplicationManager.java | 99 ++++++++
.../health/TestECMisReplicationCheckHandler.java | 265 +++++++++++++++++++++
.../health/TestECReplicationCheckHandler.java | 84 +------
8 files changed, 520 insertions(+), 150 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index 6dcec13c50..70b2a44427 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -140,6 +140,8 @@ public abstract class MisReplicationHandler implements
container.getContainerID());
return 0;
}
+
+ LOG.debug("Handling mis replicated container {}.", container);
Set<ContainerReplica> sources = filterSources(replicas);
Set<ContainerReplica> replicasToBeReplicated = containerPlacement
.replicasToCopyToFixMisreplication(replicas.stream()
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 4974407f36..f6d7f82ed9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.balancer.MoveManager;
+import
org.apache.hadoop.hdds.scm.container.replication.health.ECMisReplicationCheckHandler;
import
org.apache.hadoop.hdds.scm.container.replication.health.MismatchedReplicasHandler;
import
org.apache.hadoop.hdds.scm.container.replication.health.ClosedWithUnhealthyReplicasHandler;
import
org.apache.hadoop.hdds.scm.container.replication.health.ClosingContainerHandler;
@@ -181,6 +182,7 @@ public class ReplicationManager implements SCMService {
private final Clock clock;
private final ContainerReplicaPendingOps containerReplicaPendingOps;
private final ECReplicationCheckHandler ecReplicationCheckHandler;
+ private final ECMisReplicationCheckHandler ecMisReplicationCheckHandler;
private final RatisReplicationCheckHandler ratisReplicationCheckHandler;
private final EventPublisher eventPublisher;
private final AtomicReference<ReplicationQueue> replicationQueue
@@ -238,8 +240,9 @@ public class ReplicationManager implements SCMService {
TimeUnit.MILLISECONDS);
this.containerReplicaPendingOps = replicaPendingOps;
this.legacyReplicationManager = legacyReplicationManager;
- this.ecReplicationCheckHandler =
- new ECReplicationCheckHandler(ecContainerPlacement);
+ this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
+ this.ecMisReplicationCheckHandler =
+ new ECMisReplicationCheckHandler(ecContainerPlacement);
this.ratisReplicationCheckHandler =
new RatisReplicationCheckHandler(ratisContainerPlacement);
this.nodeManager = nodeManager;
@@ -274,6 +277,7 @@ public class ReplicationManager implements SCMService {
.addNext(ecReplicationCheckHandler)
.addNext(ratisReplicationCheckHandler)
.addNext(new ClosedWithUnhealthyReplicasHandler(this))
+ .addNext(ecMisReplicationCheckHandler)
.addNext(new RatisUnhealthyReplicationCheckHandler());
start();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
index 0ab60dfae2..b5dd9ed3c3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithUnhealthyReplicasHandler.java
@@ -50,7 +50,7 @@ public class ClosedWithUnhealthyReplicasHandler extends
AbstractCheck {
/**
* Handles a closed EC container with unhealthy replicas. Note that if we
- * reach here, there is no over, under, or mis replication. This handler
+ * reach here, there is no over or under replication. This handler
* will just send commands to delete the unhealthy replicas.
*
* <p>
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECMisReplicationCheckHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECMisReplicationCheckHandler.java
new file mode 100644
index 0000000000..e42eb6b93f
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECMisReplicationCheckHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+
+/**
+ * This class checks if an EC container is mis replicated. The container should
+ * not be over or under replicated, and should not have an excess of UNHEALTHY
+ * replicas.
+ */
+public class ECMisReplicationCheckHandler extends AbstractCheck {
+ static final Logger LOG =
+ LoggerFactory.getLogger(ECMisReplicationCheckHandler.class);
+
+ private final PlacementPolicy placementPolicy;
+
+ public ECMisReplicationCheckHandler(PlacementPolicy placementPolicy) {
+ this.placementPolicy = placementPolicy;
+ }
+
+ @Override
+ public boolean handle(ContainerCheckRequest request) {
+ if (request.getContainerInfo().getReplicationType() != EC) {
+ // This handler is only for EC containers.
+ return false;
+ }
+
+ ReplicationManagerReport report = request.getReport();
+ ContainerInfo container = request.getContainerInfo();
+ ContainerID containerID = container.containerID();
+ LOG.debug("Checking container {} for mis replication.", container);
+
+ ContainerHealthResult health = checkMisReplication(request);
+ if (health.getHealthState() ==
+ ContainerHealthResult.HealthState.MIS_REPLICATED) {
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED, containerID);
+ ContainerHealthResult.MisReplicatedHealthResult misRepHealth
+ = ((ContainerHealthResult.MisReplicatedHealthResult) health);
+ if (!misRepHealth.isReplicatedOkAfterPending()) {
+ request.getReplicationQueue().enqueue(misRepHealth);
+ }
+ LOG.debug("Container {} is Mis Replicated. isReplicatedOkAfterPending "
+ + "is [{}]. Reason for mis replication is [{}].", container,
+ misRepHealth.isReplicatedOkAfterPending(),
+ misRepHealth.getMisReplicatedReason());
+ return true;
+ }
+
+ return false;
+ }
+
+ ContainerHealthResult checkMisReplication(ContainerCheckRequest request) {
+ ContainerInfo container = request.getContainerInfo();
+ Set<ContainerReplica> replicas = request.getContainerReplicas();
+
+ ContainerPlacementStatus placement = getPlacementStatus(replicas,
+ container.getReplicationConfig().getRequiredNodes(),
+ Collections.emptyList());
+ if (!placement.isPolicySatisfied()) {
+ /*
+ Check if policy is satisfied by considering pending ops. For example if
+ there are some pending adds that can fix mis replication then we don't
+ want to queue this container.
+ */
+ ContainerPlacementStatus placementAfterPending = getPlacementStatus(
+ replicas, container.getReplicationConfig().getRequiredNodes(),
+ request.getPendingOps());
+ return new ContainerHealthResult.MisReplicatedHealthResult(
+ container, placementAfterPending.isPolicySatisfied(),
+ placementAfterPending.misReplicatedReason());
+ }
+
+ return new ContainerHealthResult.HealthyResult(container);
+ }
+
+ /**
+ * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
+ * and then check if the list meets the container placement policy.
+ * @param replicas List of containerReplica
+ * @param replicationFactor Expected Replication Factor of the container
+ * @return ContainerPlacementStatus indicating if the policy is met or not
+ */
+ private ContainerPlacementStatus getPlacementStatus(
+ Set<ContainerReplica> replicas, int replicationFactor,
+ List<ContainerReplicaOp> pendingOps) {
+
+ Set<DatanodeDetails> replicaDns = replicas
+ .stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toSet());
+
+ for (ContainerReplicaOp op : pendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ replicaDns.add(op.getTarget());
+ }
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ replicaDns.remove(op.getTarget());
+ }
+ }
+
+ return placementPolicy.validateContainerPlacement(
+ new ArrayList<>(replicaDns), replicationFactor);
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
index af99e4b2d6..f4677f8215 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
@@ -17,9 +17,6 @@
package org.apache.hadoop.hdds.scm.container.replication.health;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -31,11 +28,8 @@ import
org.apache.hadoop.hdds.scm.container.replication.ECContainerReplicaCount;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
@@ -49,12 +43,6 @@ public class ECReplicationCheckHandler extends AbstractCheck
{
private static final Logger LOG =
LoggerFactory.getLogger(ECReplicationCheckHandler.class);
- private final PlacementPolicy placementPolicy;
-
- public ECReplicationCheckHandler(PlacementPolicy placementPolicy) {
- this.placementPolicy = placementPolicy;
- }
-
@Override
public boolean handle(ContainerCheckRequest request) {
if (request.getContainerInfo().getReplicationType() != EC) {
@@ -114,25 +102,12 @@ public class ECReplicationCheckHandler extends
AbstractCheck {
LOG.debug("Container {} is Over Replicated. isReplicatedOkAfterPending "
+ "is [{}]", container, overHealth.isReplicatedOkAfterPending());
return true;
- } else if (health.getHealthState() ==
- ContainerHealthResult.HealthState.MIS_REPLICATED) {
- report.incrementAndSample(
- ReplicationManagerReport.HealthState.MIS_REPLICATED, containerID);
- ContainerHealthResult.MisReplicatedHealthResult misRepHealth
- = ((ContainerHealthResult.MisReplicatedHealthResult) health);
- if (!misRepHealth.isReplicatedOkAfterPending()) {
- request.getReplicationQueue().enqueue(misRepHealth);
- }
- LOG.debug("Container {} is Mis Replicated. isReplicatedOkAfterPending "
- + "is [{}]. Reason for mis replication is [{}].", container,
- misRepHealth.isReplicatedOkAfterPending(),
- misRepHealth.getMisReplicatedReason());
- return true;
}
+
// Should not get here, but in case it does the container is not healthy,
// but is also not under or over replicated.
- LOG.warn("Container {} is not healthy but is not under, over or "
- + " mis-replicated. Should not happen.", container);
+ LOG.warn("Container {} is not healthy but is not under or over replicated"
+
+ ". Should not happen.", container);
return false;
}
@@ -179,44 +154,9 @@ public class ECReplicationCheckHandler extends
AbstractCheck {
.OverReplicatedHealthResult(container, overRepIndexes.size(),
!replicaCount.isOverReplicated(true));
}
- ContainerPlacementStatus placement = getPlacementStatus(replicas,
- container.getReplicationConfig().getRequiredNodes(),
- Collections.emptyList());
- if (!placement.isPolicySatisfied()) {
- ContainerPlacementStatus placementAfterPending = getPlacementStatus(
- replicas, container.getReplicationConfig().getRequiredNodes(),
- request.getPendingOps());
- return new ContainerHealthResult.MisReplicatedHealthResult(
- container, placementAfterPending.isPolicySatisfied(),
- placementAfterPending.misReplicatedReason());
- }
+
// No issues detected, so return healthy.
return new ContainerHealthResult.HealthyResult(container);
}
- /**
- * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
- * and then check if the list meets the container placement policy.
- * @param replicas List of containerReplica
- * @param replicationFactor Expected Replication Factor of the container
- * @return ContainerPlacementStatus indicating if the policy is met or not
- */
- private ContainerPlacementStatus getPlacementStatus(
- Set<ContainerReplica> replicas, int replicationFactor,
- List<ContainerReplicaOp> pendingOps) {
-
- Set<DatanodeDetails> replicaDns = replicas.stream()
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toSet());
- for (ContainerReplicaOp op : pendingOps) {
- if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
- replicaDns.add(op.getTarget());
- }
- if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
- replicaDns.remove(op.getTarget());
- }
- }
- return placementPolicy.validateContainerPlacement(
- new ArrayList<>(replicaDns), replicationFactor);
- }
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 8d142d761b..eddd813a04 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -47,6 +47,7 @@ import
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Lists;
import org.apache.ozone.test.TestClock;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.Assert;
@@ -828,6 +829,104 @@ public class TestReplicationManager {
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
+ @Test
+ public void testMisReplicatedECContainer() throws IOException {
+ Mockito.when(ecPlacementPolicy.validateContainerPlacement(
+ anyList(), anyInt()))
+ .thenReturn(new ContainerPlacementStatusDefault(4, 5, 5));
+
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
+
+ replicationManager.processContainer(container, repQueue, repReport);
+ assertEquals(1, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ assertEquals(1, repReport.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+ /**
+ * Consider an EC container with 5 closed and 1 unhealthy replica.
+ * Assume this container is also mis replicated because it's on
+ * insufficient racks. For such EC containers, RM should first delete the
+ * unhealthy replica and then solve mis replication.
+ */
+ @Test
+ public void testMisReplicatedECContainerWithUnhealthyReplica()
+ throws ContainerNotFoundException {
+ Mockito.when(ecPlacementPolicy.validateContainerPlacement(
+ anyList(), anyInt()))
+ .thenReturn(new ContainerPlacementStatusDefault(5, 5, 5, 1,
+ Lists.newArrayList(2, 1, 1, 1, 1)));
+
+ ContainerInfo container = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ Set<ContainerReplica> replicas =
+ addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4,
+ 5);
+ ContainerReplica unhealthyReplica =
+ createContainerReplica(container.containerID(), 1, IN_SERVICE,
+ ContainerReplicaProto.State.UNHEALTHY);
+ replicas.add(unhealthyReplica);
+ storeContainerAndReplicas(container, replicas);
+
+ replicationManager.processContainer(container, repQueue, repReport);
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ /*
+ Does not get queued as over replicated because a delete command is sent
+ directly for the unhealthy replica.
+ */
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ assertEquals(1, repReport.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ List<ContainerReplicaOp> ops =
+ containerReplicaPendingOps.getPendingOps(container.containerID());
+ Mockito.verify(nodeManager).addDatanodeCommand(any(), any());
+ Assertions.assertEquals(1, ops.size());
+ Assertions.assertEquals(ContainerReplicaOp.PendingOpType.DELETE,
+ ops.get(0).getOpType());
+ Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(),
+ ops.get(0).getTarget());
+ Assertions.assertEquals(1, ops.get(0).getReplicaIndex());
+ Assertions.assertEquals(1, replicationManager.getMetrics()
+ .getEcDeletionCmdsSentTotal());
+ Assertions.assertEquals(0, replicationManager.getMetrics()
+ .getDeletionCmdsSentTotal());
+
+ /*
+ Now, remove the unhealthy replica. This leaves 5 replicas on 4 racks,
+ which is mis replication. RM should queue this container as mis
+ replicated now.
+ */
+ replicas.remove(unhealthyReplica);
+ containerReplicaPendingOps.completeDeleteReplica(container.containerID(),
+ unhealthyReplica.getDatanodeDetails(), 1);
+ Mockito.when(ecPlacementPolicy.validateContainerPlacement(
+ anyList(), anyInt()))
+ .thenReturn(new ContainerPlacementStatusDefault(4, 5, 5, 1,
+ Lists.newArrayList(2, 1, 1, 1)));
+
+ repReport = new ReplicationManagerReport();
+ replicationManager.processContainer(container, repQueue, repReport);
+ assertEquals(1, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ assertEquals(1, repReport.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ assertEquals(0, repReport.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
@Test
public void testUnderReplicationQueuePopulated() {
// Make it always return mis-replicated. Only a perfectly replicated
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECMisReplicationCheckHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECMisReplicationCheckHandler.java
new file mode 100644
index 0000000000..3585b76d9b
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECMisReplicationCheckHandler.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
+import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+
+/**
+ * Unit tests for {@link ECMisReplicationCheckHandler}.
+ */
+public class TestECMisReplicationCheckHandler {
+ private ECMisReplicationCheckHandler handler;
+ private ECReplicationConfig repConfig;
+ private ReplicationQueue repQueue;
+ private int maintenanceRedundancy = 2;
+ private ContainerCheckRequest.Builder requestBuilder;
+ private ReplicationManagerReport report;
+ private PlacementPolicy placementPolicy;
+
+ @Before
+ public void setup() {
+ placementPolicy = Mockito.mock(PlacementPolicy.class);
+ Mockito.when(placementPolicy.validateContainerPlacement(
+ anyList(), anyInt()))
+ .thenReturn(new ContainerPlacementStatusDefault(5, 5, 5));
+ handler = new ECMisReplicationCheckHandler(placementPolicy);
+ repConfig = new ECReplicationConfig(3, 2);
+ repQueue = new ReplicationQueue();
+ report = new ReplicationManagerReport();
+ requestBuilder = new ContainerCheckRequest.Builder()
+ .setReplicationQueue(repQueue)
+ .setMaintenanceRedundancy(maintenanceRedundancy)
+ .setPendingOps(Collections.emptyList())
+ .setReport(report);
+ }
+
+ @Test
+ public void shouldReturnFalseForHealthyContainer() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+
+ ContainerHealthResult result = handler.checkMisReplication(request);
+ assertEquals(ContainerHealthResult.HealthState.HEALTHY,
+ result.getHealthState());
+
+ assertFalse(handler.handle(request));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void shouldReturnFalseForNonECContainer() {
+ ContainerInfo container =
+ createContainerInfo(RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.THREE));
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+
+ assertFalse(handler.handle(request));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ }
+
+ @Test
+ public void shouldHandleMisReplicatedContainer() {
+ ContainerInfo container = createContainerInfo(repConfig);
+
+ // Placement policy is always violated
+ Mockito.when(placementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation ->
+ new ContainerPlacementStatusDefault(4, 5, 9));
+
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+
+ ContainerHealthResult result = handler.checkMisReplication(request);
+ Assertions.assertEquals(ContainerHealthResult.HealthState.MIS_REPLICATED,
+ result.getHealthState());
+
+ assertTrue(handler.handle(request));
+ assertEquals(1, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ }
+
+ /**
+ * If mis replication is fixed by a pending ADD, then the container should
+ * not be queued.
+ */
+ @Test
+ public void shouldReturnFalseForMisReplicatedContainerFixedByPending() {
+ ContainerInfo container = createContainerInfo(repConfig);
+
+ Mockito.when(placementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation -> {
+ List<DatanodeDetails> dns = invocation.getArgument(0);
+ // If the number of DNs is 5 or less make it be mis-replicated
+ if (dns.size() <= 5) {
+ return new ContainerPlacementStatusDefault(4, 5, 9);
+ } else {
+ return new ContainerPlacementStatusDefault(5, 5, 9);
+ }
+ });
+
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 1));
+
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending)
+ .build();
+
+ ContainerHealthResult result = handler.checkMisReplication(request);
+ Assertions.assertEquals(ContainerHealthResult.HealthState.MIS_REPLICATED,
+ result.getHealthState());
+
+ assertTrue(handler.handle(request));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ }
+
+ /**
+ * This tests what happens when there's a pending DELETE. Suppose there's a
+ * mis replicated container with an excess unhealthy replica. And, there's a
+ * pending delete scheduled for the unhealthy replica. Then this container
+ * should not be queued.
+ */
+ @Test
+ public void testMisReplicationWithUnhealthyReplica() {
+ ContainerInfo container = createContainerInfo(repConfig);
+
+ Mockito.when(placementPolicy.validateContainerPlacement(
+ Mockito.any(),
+ Mockito.anyInt()
+ )).thenAnswer(invocation -> {
+ List<DatanodeDetails> dns = invocation.getArgument(0);
+ // If the number of DNs is 6 or more make it mis-replicated
+ if (dns.size() > 5) {
+ return new ContainerPlacementStatusDefault(5, 6, 9);
+ } else {
+ return new ContainerPlacementStatusDefault(5, 5, 9);
+ }
+ });
+
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5));
+ ContainerReplica unhealthyReplica =
+ createContainerReplica(container.containerID(), 1, IN_SERVICE,
+ State.UNHEALTHY);
+ replicas.add(unhealthyReplica);
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ DELETE, unhealthyReplica.getDatanodeDetails(), 1));
+
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending)
+ .build();
+
+ ContainerHealthResult result = handler.checkMisReplication(request);
+ Assertions.assertEquals(ContainerHealthResult.HealthState.MIS_REPLICATED,
+ result.getHealthState());
+
+ assertTrue(handler.handle(request));
+ assertEquals(0, repQueue.underReplicatedQueueSize());
+ assertEquals(0, repQueue.overReplicatedQueueSize());
+ assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MIS_REPLICATED));
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
index a427393cf6..ccb239637a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
@@ -18,7 +18,6 @@ package
org.apache.hadoop.hdds.scm.container.replication.health;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -82,7 +81,7 @@ public class TestECReplicationCheckHandler {
Mockito.when(placementPolicy.validateContainerPlacement(
anyList(), anyInt()))
.thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
- healthCheck = new ECReplicationCheckHandler(placementPolicy);
+ healthCheck = new ECReplicationCheckHandler();
repConfig = new ECReplicationConfig(3, 2);
repQueue = new ReplicationQueue();
report = new ReplicationManagerReport();
@@ -579,87 +578,6 @@ public class TestECReplicationCheckHandler {
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
- @Test
- public void testMisReplicatedContainer() {
- ContainerInfo container = createContainerInfo(repConfig);
-
- // Placement policy is always violated
- Mockito.when(placementPolicy.validateContainerPlacement(
- Mockito.any(),
- Mockito.anyInt()
- )).thenAnswer(invocation ->
- new ContainerPlacementStatusDefault(4, 5, 9));
-
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
- Pair.of(IN_SERVICE, 5));
- ContainerCheckRequest request = requestBuilder
- .setContainerReplicas(replicas)
- .setContainerInfo(container)
- .build();
-
- ContainerHealthResult result = healthCheck.checkHealth(request);
- assertEquals(HealthState.MIS_REPLICATED, result.getHealthState());
-
- assertTrue(healthCheck.handle(request));
- assertEquals(1, repQueue.underReplicatedQueueSize());
- assertEquals(0, repQueue.overReplicatedQueueSize());
- assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.UNDER_REPLICATED));
- assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.OVER_REPLICATED));
- assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.MIS_REPLICATED));
- }
-
- @Test
- public void testMisReplicatedContainerFixedByPending() {
- ContainerInfo container = createContainerInfo(repConfig);
-
- Mockito.when(placementPolicy.validateContainerPlacement(
- Mockito.any(),
- Mockito.anyInt()
- )).thenAnswer(invocation -> {
- List<DatanodeDetails> dns = invocation.getArgument(0);
- // If the number of DNs is 5 or less make it be mis-replicated
- if (dns.size() <= 5) {
- return new ContainerPlacementStatusDefault(4, 5, 9);
- } else {
- return new ContainerPlacementStatusDefault(5, 5, 9);
- }
- });
-
- List<ContainerReplicaOp> pending = new ArrayList<>();
- pending.add(ContainerReplicaOp.create(
- ADD, MockDatanodeDetails.randomDatanodeDetails(), 1));
-
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
- Pair.of(IN_SERVICE, 5));
- ContainerCheckRequest request = requestBuilder
- .setContainerReplicas(replicas)
- .setContainerInfo(container)
- .setPendingOps(pending)
- .build();
-
- ContainerHealthResult result = healthCheck.checkHealth(request);
- assertEquals(HealthState.MIS_REPLICATED, result.getHealthState());
-
- // Under-replicated takes precedence and the over-replication is ignored
- // for now.
- assertTrue(healthCheck.handle(request));
- assertEquals(0, repQueue.underReplicatedQueueSize());
- assertEquals(0, repQueue.overReplicatedQueueSize());
- assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.UNDER_REPLICATED));
- assertEquals(0, report.getStat(
- ReplicationManagerReport.HealthState.OVER_REPLICATED));
- assertEquals(1, report.getStat(
- ReplicationManagerReport.HealthState.MIS_REPLICATED));
- }
-
@Test
public void testUnderAndMisReplicatedContainer() {
ContainerInfo container = createContainerInfo(repConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]