This is an automated email from the ASF dual-hosted git repository.
erose pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 872401ce88 HDDS-9257. LegacyReplicationManager: Unhealthy replicas
could block under replication handling (#5261)
872401ce88 is described below
commit 872401ce88bcb0aee703c7156ebe476e35c75d07
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Tue Sep 12 22:11:41 2023 +0530
HDDS-9257. LegacyReplicationManager: Unhealthy replicas could block under
replication handling (#5261)
---
.../replication/LegacyReplicationManager.java | 103 ++++++++---
.../replication/RatisUnderReplicationHandler.java | 9 +-
.../replication/ReplicationManagerUtil.java | 19 +-
.../replication/TestLegacyReplicationManager.java | 194 +++++++++++++++++++++
4 files changed, 289 insertions(+), 36 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index c851470eae..e627ec4a3c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -1141,8 +1141,55 @@ public class LegacyReplicationManager {
List<ContainerReplica> replicationSources =
getReplicationSources(container,
replicaSet.getReplicas(), State.CLOSED, State.QUASI_CLOSED);
// This method will handle topology even if replicasNeeded <= 0.
- replicateAnyWithTopology(container, replicationSources,
- placementStatus, replicasNeeded);
+ try {
+ replicateAnyWithTopology(container, replicationSources,
+ placementStatus, replicasNeeded, replicaSet.getReplicas());
+ } catch (SCMException e) {
+ if (e.getResult()
+ .equals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE) &&
+ replicasNeeded > 0) {
+ /*
+ If we reach here, the container is under replicated but placement
+ policy could not find any target Datanodes to host new replicas.
+ We can try unblocking under replication handling by removing any
+ unhealthy replicas. This will free up those datanodes, so they can host
+ healthy replicas.
+ */
+ deleteUnhealthyReplicaIfNeeded(container, replicaSet);
+ }
+ }
+ }
+
+ /**
+ * Finds and deletes an unhealthy replica (UNHEALTHY or QUASI_CLOSED) under
+ * certain conditions.
+ */
+ private void deleteUnhealthyReplicaIfNeeded(ContainerInfo container,
+ RatisContainerReplicaCount replicaCount) {
+ LOG.info("Finding an unhealthy replica to delete for container {} with " +
+ "replicas {} to unblock under replication handling.", container,
+ replicaCount.getReplicas());
+
+ Set<ContainerReplica> replicas = new HashSet<>(replicaCount.getReplicas());
+ ContainerReplica replica = ReplicationManagerUtil
+ .selectUnhealthyReplicaForDelete(container, replicas,
+ getInflightDel(container.containerID()),
+ (dnd) -> {
+ try {
+ return nodeManager.getNodeStatus(dnd);
+ } catch (NodeNotFoundException e) {
+ return null;
+ }
+ });
+
+ if (replica == null) {
+ LOG.info(
+ "Could not find any unhealthy replica to delete when unblocking " +
+ "under replication handling for container {} with replicas {}.",
+ container, replicas);
+ } else {
+ sendDeleteCommand(container, replica.getDatanodeDetails(), false);
+ }
}
/**
@@ -1303,8 +1350,14 @@ public class LegacyReplicationManager {
final ContainerPlacementStatus placementStatus = getPlacementStatus(
new HashSet<>(replicationSources),
container.getReplicationConfig().getRequiredNodes());
- replicateAnyWithTopology(container, replicationSources,
- placementStatus, replicas.size() - replicationSources.size());
+ try {
+ replicateAnyWithTopology(container, replicationSources,
+ placementStatus, replicas.size() - replicationSources.size(),
+ replicas);
+ } catch (SCMException e) {
+ LOG.warn("Could not fix container {} with replicas {}.", container,
+ replicas, e);
+ }
}
}
@@ -1504,12 +1557,13 @@ public class LegacyReplicationManager {
final long containerID = id.getId();
final ReplicateContainerCommand replicateCommand =
ReplicateContainerCommand.fromSources(containerID, sources);
- LOG.info("Sending {} to {}", replicateCommand, target);
+ LOG.debug("Trying to send {} to {}", replicateCommand, target);
final boolean sent = sendAndTrackDatanodeCommand(target, replicateCommand,
action -> addInflight(InflightType.REPLICATION, id, action));
if (sent) {
+ LOG.info("Sent {} to {}", replicateCommand, target);
metrics.incrReplicationCmdsSentTotal();
metrics.incrReplicationBytesTotal(container.getUsedBytes());
}
@@ -2074,9 +2128,14 @@ public class LegacyReplicationManager {
container.getContainerID(), additionalReplicasNeeded);
// TODO Datanodes currently shuffle sources, so we cannot prioritize
// some replicas based on BCSID or origin node ID.
- replicateAnyWithTopology(container,
- getReplicationSources(container, replicas), placementStatus,
- additionalReplicasNeeded);
+ try {
+ replicateAnyWithTopology(container,
+ getReplicationSources(container, replicas), placementStatus,
+ additionalReplicasNeeded, replicas);
+ } catch (SCMException e) {
+ LOG.warn("Could not fix container {} with replicas {}.", container,
+ replicas, e);
+ }
}
}
@@ -2195,7 +2254,7 @@ public class LegacyReplicationManager {
// Sort containers with lowest BCSID first. These will be the first ones
// deleted.
deleteCandidates.sort(
- Comparator.comparingLong(ContainerReplica::getSequenceId).reversed());
+ Comparator.comparingLong(ContainerReplica::getSequenceId));
deleteExcess(container, deleteCandidates, excess);
}
@@ -2205,12 +2264,14 @@ public class LegacyReplicationManager {
* topology requirements.
*/
private void replicateAnyWithTopology(ContainerInfo container,
- List<ContainerReplica> replicas,
- ContainerPlacementStatus placementStatus, int additionalReplicasNeeded) {
+ List<ContainerReplica> sourceReplicas,
+ ContainerPlacementStatus placementStatus, int additionalReplicasNeeded,
+ List<ContainerReplica> allReplicas)
+ throws SCMException {
try {
final ContainerID id = container.containerID();
- final List<DatanodeDetails> sourceDNs = replicas.stream()
+ final List<DatanodeDetails> sourceDNs = sourceReplicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
final List<DatanodeDetails> replicationInFlight
@@ -2236,19 +2297,15 @@ public class LegacyReplicationManager {
return;
}
- // We should ensure that the target datanode has enough space
- // for a complete container to be created, but since the container
- // size may be changed smaller than origin, we should be defensive.
- final long dataSizeRequired = Math.max(container.getUsedBytes(),
- currentContainerSize);
- final List<DatanodeDetails> excludeList = replicas.stream()
- .filter(r -> !r.getDatanodeDetails().isDecommissioned())
+ final List<DatanodeDetails> excludeList = allReplicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
excludeList.addAll(replicationInFlight);
- final List<DatanodeDetails> selectedDatanodes = containerPlacement
- .chooseDatanodes(excludeList, null, replicasNeeded,
- 0, dataSizeRequired);
+ final List<DatanodeDetails> selectedDatanodes =
+ ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
+ replicasNeeded, null, excludeList, currentContainerSize,
+ container);
+
if (additionalReplicasNeeded > 0) {
LOG.info("Container {} is under replicated. Expected replica count" +
" is {}, but found {}.", id, replicationFactor,
@@ -2283,7 +2340,7 @@ public class LegacyReplicationManager {
"replica found.",
container.containerID());
}
- } catch (IOException | IllegalStateException ex) {
+ } catch (IllegalStateException ex) {
LOG.warn("Exception while replicating container {}.",
container.getContainerID(), ex);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index cc10d86156..931da903ea 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -153,8 +153,15 @@ public class RatisUnderReplicationHandler
private void removeUnhealthyReplicaIfPossible(ContainerInfo containerInfo,
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps)
throws NotLeaderException {
+ int pendingDeletes = 0;
+ for (ContainerReplicaOp op : pendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ pendingDeletes++;
+ }
+ }
ContainerReplica deleteCandidate = ReplicationManagerUtil
- .selectUnhealthyReplicaForDelete(containerInfo, replicas, pendingOps,
+ .selectUnhealthyReplicaForDelete(containerInfo, replicas,
+ pendingDeletes,
(dnd) -> {
try {
return replicationManager.getNodeStatus(dnd);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index c3d79f1118..f355030599 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -201,25 +201,20 @@ public final class ReplicationManagerUtil {
* unhealthy replicas or quasi-closed replicas which cannot be closed due to
* having a lagging sequence ID. The logic here will select a replica to
* delete, or return null if there are none which can be safely deleted.
+ *
* @param containerInfo The container to select a replica to delete from
* @param replicas The list of replicas for the container
- * @param pendingOps The list of pending replica operations for the container
+ * @param pendingDeletes number pending deletes for this container
* @return A replica to delete, or null if there are none which can be safely
* deleted.
*/
public static ContainerReplica selectUnhealthyReplicaForDelete(
ContainerInfo containerInfo, Set<ContainerReplica> replicas,
- List<ContainerReplicaOp> pendingOps,
- Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
-
- for (ContainerReplicaOp op : pendingOps) {
- if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
- // There is at least one pending delete which will free up a node.
- // Therefore we do nothing until that delete completes or times out.
- LOG.debug("Container {} has pending deletes which will free nodes",
- pendingOps);
- return null;
- }
+ int pendingDeletes, Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
+ if (pendingDeletes > 0) {
+ LOG.debug("Container {} has {} pending deletes which will free nodes.",
+ containerInfo, pendingDeletes);
+ return null;
}
if (replicas.size() <= 2) {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 8a3c8b71e8..391e878db0 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hdds.utils.db.LongCodec;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.Disabled;
@@ -112,6 +115,7 @@ import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
import static
org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_NUM_KEYS_DEFAULT;
import static
org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_USED_BYTES_DEFAULT;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
+import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicaBuilder;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.mockito.Mockito.when;
@@ -923,6 +927,196 @@ public class TestLegacyReplicationManager {
assertExactDeleteTargets(unhealthyReplica.getDatanodeDetails());
}
+ /**
+ * In small clusters, handling an under replicated container can get
+ * blocked because DNs are occupied by unhealthy replicas. This
+ * would make the placement policy throw an exception because it could
+ * not find any target datanodes for new replicas.
+ *
+ * Situation:
+ * Consider a CLOSED container with replicas 1 CLOSED, 1 QUASI_CLOSED with
+ * same seq id as the container, and 1 QUASI_CLOSED with smaller seq id.
+ * Placement policy is mocked to simulate no other target DNs are
available.
+ *
+ * Expectation:
+ * 1st iteration: QUASI_CLOSED with same seq id should get closed, and the
+ * one with smaller seq id should get deleted to free up a DN.
+ * 2nd iteration: Any CLOSED replica should be replicated.
+ * 3rd iteration: Container should be OK now.
+ */
+ @Test
+ public void testUnderReplicationBlockedByUnhealthyReplicas()
+ throws IOException, TimeoutException {
+ /*
+ In the first iteration, throw an SCMException to simulate that placement
+ policy could not find any targets. In the second iteration, return a list
+ of required targets.
+ */
+ Mockito.when(ratisContainerPlacementPolicy.chooseDatanodes(
+ Mockito.any(), Mockito.any(), Mockito.anyInt(),
+ Mockito.anyLong(), Mockito.anyLong()))
+ .thenAnswer(invocation -> {
+ throw new SCMException(
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ })
+ .thenAnswer(invocation -> {
+ int nodesRequired = invocation.getArgument(2);
+ List<DatanodeDetails> nodes = new ArrayList<>(nodesRequired);
+ while (nodesRequired != 0) {
+ nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ nodesRequired--;
+ }
+ return nodes;
+ });
+
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ addReplicaToDn(container, randomDatanodeDetails(), CLOSED,
+ container.getSequenceId());
+ ContainerReplica quasiToDelete = addReplicaToDn(container,
+ randomDatanodeDetails(), QUASI_CLOSED, container.getSequenceId() -
1);
+ ContainerReplica quasi2 = addReplicaToDn(container,
+ randomDatanodeDetails(), QUASI_CLOSED, container.getSequenceId());
+
+ // First RM iteration.
+ // this container is under replicated by 2 replicas.
+ // quasi2 should be closed since its BCSID matches the container's.
+ // delete command should be sent for quasiToDelete to unblock under rep
+ // handling.
+ assertDeleteScheduled(1);
+ Assertions.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.closeContainerCommand,
+ quasi2.getDatanodeDetails()));
+ Assertions.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
+ quasiToDelete.getDatanodeDetails()));
+ assertUnderReplicatedCount(1);
+
+ // Update RM with the results of the close and delete commands
+ ContainerReplica quasiToClosed = getReplicaBuilder(
+ container.containerID(), CLOSED, quasi2.getBytesUsed(),
+ quasi2.getKeyCount(), container.getSequenceId(),
+ quasi2.getOriginDatanodeId(), quasi2.getDatanodeDetails()).build();
+ containerStateManager.updateContainerReplica(container.containerID(),
+ quasiToClosed);
+ containerStateManager.removeContainerReplica(
+ container.containerID(), quasiToDelete);
+
+ // Second RM iteration
+ // Now that we have a free DN, a closed replica should be replicated
+ assertReplicaScheduled(1);
+ assertUnderReplicatedCount(1);
+
+ // Process the replicate command and report the replica back to SCM.
+ List<CommandForDatanode> replicateCommands = datanodeCommandHandler
+ .getReceivedCommands().stream()
+ .filter(c -> c.getCommand().getType()
+ .equals(SCMCommandProto.Type.replicateContainerCommand))
+ .collect(Collectors.toList());
+ Assertions.assertEquals(1, replicateCommands.size());
+ // Report the new replica to SCM.
+ for (CommandForDatanode replicateCommand: replicateCommands) {
+ DatanodeDetails newNode = createDatanodeDetails(
+ replicateCommand.getDatanodeId());
+ ContainerReplica newReplica = getReplicas(
+ container.containerID(), CLOSED,
+ container.getSequenceId(), newNode.getUuid(), newNode);
+ containerStateManager.updateContainerReplica(container.containerID(),
+ newReplica);
+ }
+
+ // Third RM iteration
+ assertReplicaScheduled(0);
+ assertUnderReplicatedCount(0);
+ assertOverReplicatedCount(0);
+ }
+
+ /**
+ * Test for when a quasi_closed container's under replication cannot be
+ * solved because there are UNHEALTHY replicas occupying datanodes.
+ */
+ @Test
+ public void testUnderRepQuasiClosedContainerBlockedByUnhealthyReplicas()
+ throws IOException, TimeoutException {
+ Mockito.when(ratisContainerPlacementPolicy.chooseDatanodes(
+ Mockito.anyList(), Mockito.any(), Mockito.anyInt(),
+ Mockito.anyLong(), Mockito.anyLong()))
+ .thenAnswer(invocation -> {
+ List<DatanodeDetails> excluded = invocation.getArgument(0);
+ if (excluded.size() == 3) {
+ throw new SCMException(
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ } else {
+ int nodesRequired = invocation.getArgument(2);
+ List<DatanodeDetails> nodes = new ArrayList<>(nodesRequired);
+ while (nodesRequired != 0) {
+ DatanodeDetails dn =
+ MockDatanodeDetails.randomDatanodeDetails();
+ nodeManager.register(dn, NodeStatus.inServiceHealthy());
+ nodes.add(dn);
+ nodesRequired--;
+ }
+ return nodes;
+ }
+ });
+
+ final ContainerInfo container =
+ createContainer(LifeCycleState.QUASI_CLOSED);
+ ContainerReplica quasi1 = addReplicaToDn(container,
+ randomDatanodeDetails(), QUASI_CLOSED, container.getSequenceId());
+ DatanodeDetails nodeForQuasi2 =
+ MockDatanodeDetails.randomDatanodeDetails();
+ nodeManager.register(nodeForQuasi2, NodeStatus.inServiceHealthy());
+ ContainerReplica quasi2 = getReplicaBuilder(container.containerID(),
+ QUASI_CLOSED, container.getUsedBytes(), container.getNumberOfKeys(),
+ container.getSequenceId(), quasi1.getOriginDatanodeId(),
+ nodeForQuasi2).build();
+ containerStateManager
+ .updateContainerReplica(container.containerID(), quasi2);
+ ContainerReplica unhealthy = addReplicaToDn(container,
+ randomDatanodeDetails(), UNHEALTHY, container.getSequenceId());
+
+ // First RM iteration.
+ // this container is under replicated by 1 replica.
+ // delete command should be sent for unhealthy to unblock under rep
+ // handling.
+ assertDeleteScheduled(1);
+ Assertions.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
+ unhealthy.getDatanodeDetails()));
+ assertUnderReplicatedCount(1);
+ // Update RM with the result of delete command
+ containerStateManager.removeContainerReplica(
+ container.containerID(), unhealthy);
+
+ // Second RM iteration
+ // Now that we have a free DN, a quasi_closed replica should be
replicated
+ assertReplicaScheduled(1);
+ assertUnderReplicatedCount(1);
+
+ // Process the replicate command and report the replica back to SCM.
+ List<CommandForDatanode> replicateCommands =
+ datanodeCommandHandler.getReceivedCommands().stream()
+ .filter(command -> command.getCommand().getType()
+ .equals(SCMCommandProto.Type.replicateContainerCommand))
+ .collect(Collectors.toList());
+ Assertions.assertEquals(1, replicateCommands.size());
+ ReplicateContainerCommand command = (ReplicateContainerCommand)
+ replicateCommands.iterator().next().getCommand();
+ List<DatanodeDetails> sources = command.getSourceDatanodes();
+ Assertions.assertTrue(sources.contains(quasi1.getDatanodeDetails()) &&
+ sources.contains(quasi2.getDatanodeDetails()));
+ ContainerReplica replica3 =
+ getReplicas(container.containerID(), QUASI_CLOSED,
+ container.getSequenceId(), quasi1.getOriginDatanodeId(),
+ MockDatanodeDetails.randomDatanodeDetails());
+ containerStateManager.updateContainerReplica(container.containerID(),
+ replica3);
+
+ // Third RM iteration
+ assertReplicaScheduled(0);
+ assertUnderReplicatedCount(0);
+ assertOverReplicatedCount(0);
+ }
/**
* $numReplicas unhealthy replicas.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]