This is an automated email from the ASF dual-hosted git repository.
siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a81773bafd HDDS-9353. ReplicationManager: Ignore any Datanodes that
are not in-service and healthy when finding unique origins (#5650)
a81773bafd is described below
commit a81773bafdfe405e740936b10b743f1013942e60
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Thu Nov 23 12:30:57 2023 +0530
HDDS-9353. ReplicationManager: Ignore any Datanodes that are not in-service
and healthy when finding unique origins (#5650)
---
.../replication/LegacyReplicationManager.java | 43 ++++--------
.../replication/RatisOverReplicationHandler.java | 56 +++++++---------
.../replication/RatisUnderReplicationHandler.java | 6 ++
.../replication/ReplicationManagerUtil.java | 78 +++++++++++++++++++---
.../replication/TestLegacyReplicationManager.java | 5 +-
.../TestRatisOverReplicationHandler.java | 32 ++++++++-
6 files changed, 147 insertions(+), 73 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index fc27260520..9a51537028 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -87,7 +87,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1267,6 +1266,8 @@ public class LegacyReplicationManager {
try {
return nodeManager.getNodeStatus(dnd);
} catch (NodeNotFoundException e) {
+ LOG.warn("Exception while finding an unhealthy replica to " +
+ "delete for container {}.", container, e);
return null;
}
});
@@ -2354,38 +2355,20 @@ public class LegacyReplicationManager {
// by an existing replica.
// TODO topology handling must be improved to make an optimal
// choice as to which replica to keep.
-
- // Gather the origin node IDs of replicas which are not candidates for
- // deletion.
- Set<UUID> existingOriginNodeIDs = allReplicas.stream()
- .filter(r -> !deleteCandidates.contains(r))
- .filter(
- r -> {
+ Set<ContainerReplica> allReplicasSet = new HashSet<>(allReplicas);
+ List<ContainerReplica> nonUniqueDeleteCandidates =
+ ReplicationManagerUtil.findNonUniqueDeleteCandidates(allReplicasSet,
+ deleteCandidates, (dnd) -> {
try {
- return nodeManager.getNodeStatus(r.getDatanodeDetails())
- .isHealthy();
+ return nodeManager.getNodeStatus(dnd);
} catch (NodeNotFoundException e) {
- LOG.warn("Exception when checking replica {} for container {}"
+
- " while deleting excess UNHEALTHY.", r, container, e);
- return false;
+ LOG.warn(
+ "Exception while finding excess unhealthy replicas to " +
+ "delete for container {} with replicas {}.", container,
+ allReplicas, e);
+ return null;
}
- })
- .filter(r -> r.getDatanodeDetails().getPersistedOpState()
- .equals(IN_SERVICE))
- .map(ContainerReplica::getOriginDatanodeId)
- .collect(Collectors.toSet());
-
- List<ContainerReplica> nonUniqueDeleteCandidates = new ArrayList<>();
- for (ContainerReplica replica: deleteCandidates) {
- if (existingOriginNodeIDs.contains(replica.getOriginDatanodeId())) {
- nonUniqueDeleteCandidates.add(replica);
- } else {
- // Spare this replica with this new origin node ID from deletion.
- // delete candidates seen later in the loop with this same origin
- // node ID can be deleted.
- existingOriginNodeIDs.add(replica.getOriginDatanodeId());
- }
- }
+ });
if (LOG.isDebugEnabled() && nonUniqueDeleteCandidates.size() < excess) {
LOG.debug("Unable to delete {} excess replicas of container {}. Only {}"
+
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
index dd00e01ba7..7974e72bac 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -34,11 +33,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.stream.Collectors;
/**
@@ -165,13 +161,6 @@ public class RatisOverReplicationHandler
sortReplicas(replicaCount.getReplicas(),
replicaCount.getHealthyReplicaCount() == 0);
- // retain one replica per unique origin datanode if the container is not
- // closed
- if (replicaCount.getContainer().getState() !=
- HddsProtos.LifeCycleState.CLOSED) {
- saveReplicasWithUniqueOrigins(eligibleReplicas);
- }
-
Set<DatanodeDetails> pendingDeletion = new HashSet<>();
// collect the DNs that are going to have their container replica deleted
for (ContainerReplicaOp op : pendingOps) {
@@ -187,6 +176,14 @@ public class RatisOverReplicationHandler
HddsProtos.NodeOperationalState.IN_SERVICE ||
pendingDeletion.contains(replica.getDatanodeDetails()));
+ // retain one replica per unique origin datanode if the container is not
+ // closed
+ if (replicaCount.getContainer().getState() !=
+ HddsProtos.LifeCycleState.CLOSED) {
+ saveReplicasWithUniqueOrigins(replicaCount.getContainer(),
+ eligibleReplicas);
+ }
+
return eligibleReplicas;
}
@@ -199,29 +196,26 @@ public class RatisOverReplicationHandler
* @param eligibleReplicas List of replicas that are eligible to be deleted
* and from which replicas with unique origin node ID need to be saved
*/
- private void saveReplicasWithUniqueOrigins(
+ private void saveReplicasWithUniqueOrigins(ContainerInfo container,
List<ContainerReplica> eligibleReplicas) {
- final Map<UUID, ContainerReplica> uniqueOrigins = new LinkedHashMap<>();
- eligibleReplicas.stream()
- // get unique origin nodes of healthy replicas
- .filter(r -> r.getState() != ContainerReplicaProto.State.UNHEALTHY)
- .forEach(r -> uniqueOrigins.putIfAbsent(r.getOriginDatanodeId(), r));
-
- /*
- Now that we've checked healthy replicas, see if some unhealthy replicas
- need to be saved. For example, in the case of {QUASI_CLOSED,
- QUASI_CLOSED, QUASI_CLOSED, UNHEALTHY}, if both the first and last
- replicas have the same origin node ID (and no other replicas have it), we
- prefer saving the QUASI_CLOSED replica and deleting the UNHEALTHY one.
- */
- for (ContainerReplica replica : eligibleReplicas) {
- if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
- uniqueOrigins.putIfAbsent(replica.getOriginDatanodeId(), replica);
- }
- }
+ List<ContainerReplica> nonUniqueDeleteCandidates =
+ ReplicationManagerUtil.findNonUniqueDeleteCandidates(
+ new HashSet<>(eligibleReplicas),
+ eligibleReplicas, (dnd) -> {
+ try {
+ return replicationManager.getNodeStatus(dnd);
+ } catch (NodeNotFoundException e) {
+ LOG.warn(
+ "Exception while finding excess unhealthy replicas to " +
+ "delete for container {} with eligible replicas {}.",
+ container, eligibleReplicas, e);
+ return null;
+ }
+ });
// note that this preserves order of the List
- eligibleReplicas.removeAll(uniqueOrigins.values());
+ eligibleReplicas.removeIf(
+ replica -> !nonUniqueDeleteCandidates.contains(replica));
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 5af7a16df2..98c19d16ff 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -154,6 +154,9 @@ public class RatisUnderReplicationHandler
private void removeUnhealthyReplicaIfPossible(ContainerInfo containerInfo,
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps)
throws NotLeaderException {
+ LOG.info("Finding an unhealthy replica to delete for container {} with " +
+ "replicas {} to unblock under replication handling.", containerInfo,
+ replicas);
int pendingDeletes = 0;
for (ContainerReplicaOp op : pendingOps) {
if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
@@ -167,6 +170,9 @@ public class RatisUnderReplicationHandler
try {
return replicationManager.getNodeStatus(dnd);
} catch (NodeNotFoundException e) {
+ LOG.warn("Exception while finding an unhealthy replica to " +
+ "delete for container {} with replicas {}.", containerInfo,
+ replicas, e);
return null;
}
});
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index f355030599..076a81e69b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -266,35 +266,93 @@ public final class ReplicationManagerUtil {
if (containerInfo.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
List<ContainerReplica> nonUniqueOrigins =
- findNonUniqueDeleteCandidates(replicas, deleteCandidates);
+ findNonUniqueDeleteCandidates(replicas, deleteCandidates,
+ nodeStatusFn);
return nonUniqueOrigins.size() > 0 ? nonUniqueOrigins.get(0) : null;
}
return null;
}
- private static List<ContainerReplica> findNonUniqueDeleteCandidates(
+ /**
+ * Given a list of all replicas (including deleteCandidates), finds and
+ * returns replicas which don't have unique origin node IDs. This method
+ * preserves the order of the passed deleteCandidates list. This means that
+ * the order of the returned list depends on the order of deleteCandidates;
+ * if the same deleteCandidates with the same order is passed into this
+ * method on different invocations, the order of the returned list will be
+ * the same on each invocation. To protect against different SCMs
+ * (old leader and new leader) selecting different replicas to delete,
+ * callers of this method are responsible for ensuring consistent ordering.
+ * @see
+ * <a href="https://issues.apache.org/jira/browse/HDDS-4589">HDDS-4589</a>
+ * @param allReplicas all replicas of this container including
+ * deleteCandidates
+ * @param deleteCandidates replicas that are being considered for deletion
+ * @param nodeStatusFn a Function that can be called to check the
+ * NodeStatus of each DN
+ * @return a List of replicas that can be deleted because they do not have
+ * unique origin node IDs
+ */
+ static List<ContainerReplica> findNonUniqueDeleteCandidates(
Set<ContainerReplica> allReplicas,
- List<ContainerReplica> deleteCandidates) {
+ List<ContainerReplica> deleteCandidates,
+ Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
// Gather the origin node IDs of replicas which are not candidates for
// deletion.
Set<UUID> existingOriginNodeIDs = allReplicas.stream()
.filter(r -> !deleteCandidates.contains(r))
+ .filter(r -> {
+ NodeStatus status = nodeStatusFn.apply(r.getDatanodeDetails());
+ /*
+ Replicas on datanodes that are not in-service, healthy are not
+ valid because it's likely they will be gone soon or are already
+ lost. See https://issues.apache.org/jira/browse/HDDS-9352. This
+ means that these replicas don't count as having unique origin IDs.
+ */
+ return status != null && status.isHealthy() && status.isInService();
+ })
.map(ContainerReplica::getOriginDatanodeId)
.collect(Collectors.toSet());
+ /*
+ In the case of {QUASI_CLOSED, QUASI_CLOSED, QUASI_CLOSED, UNHEALTHY}, if
+ both the first and last replicas have the same origin node ID (and no
+ other replicas have it), we prefer saving the QUASI_CLOSED replica and
+ deleting the UNHEALTHY one. So, we'll first loop through healthy replicas
+ and check for uniqueness.
+ */
List<ContainerReplica> nonUniqueDeleteCandidates = new ArrayList<>();
for (ContainerReplica replica : deleteCandidates) {
- if (existingOriginNodeIDs.contains(replica.getOriginDatanodeId())) {
- nonUniqueDeleteCandidates.add(replica);
- } else {
- // Spare this replica with this new origin node ID from deletion.
- // delete candidates seen later in the loop with this same origin
- // node ID can be deleted.
- existingOriginNodeIDs.add(replica.getOriginDatanodeId());
+ if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+ continue;
}
+ checkUniqueness(existingOriginNodeIDs, nonUniqueDeleteCandidates,
+ replica);
+ }
+
+ // now, see which UNHEALTHY replicas are not unique and can be deleted
+ for (ContainerReplica replica : deleteCandidates) {
+ if (replica.getState() != ContainerReplicaProto.State.UNHEALTHY) {
+ continue;
+ }
+ checkUniqueness(existingOriginNodeIDs, nonUniqueDeleteCandidates,
+ replica);
}
return nonUniqueDeleteCandidates;
}
+ private static void checkUniqueness(Set<UUID> existingOriginNodeIDs,
+ List<ContainerReplica> nonUniqueDeleteCandidates,
+ ContainerReplica replica) {
+ if (existingOriginNodeIDs.contains(replica.getOriginDatanodeId())) {
+ nonUniqueDeleteCandidates.add(replica);
+ } else {
+ // Spare this replica with this new origin node ID from deletion.
+ // delete candidates seen later with this same origin node ID can be
+ // deleted.
+ existingOriginNodeIDs.add(replica.getOriginDatanodeId());
+ }
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 561a4c2895..43a2fb3263 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -941,7 +941,8 @@ public class TestLegacyReplicationManager {
final ContainerReplica replica5 = getReplicas(
id, UNHEALTHY, 1000L, replica4.getOriginDatanodeId(),
randomDatanodeDetails());
- replica5.getDatanodeDetails().setPersistedOpState(DECOMMISSIONING);
+ nodeManager.register(replica5.getDatanodeDetails(),
+ new NodeStatus(DECOMMISSIONING, HEALTHY));
DatanodeDetails deadNode = randomDatanodeDetails();
nodeManager.register(deadNode, NodeStatus.inServiceDead());
final ContainerReplica replica6 = getReplicas(
@@ -1006,6 +1007,8 @@ public class TestLegacyReplicationManager {
DatanodeDetails decommissioning =
MockDatanodeDetails.randomDatanodeDetails();
decommissioning.setPersistedOpState(DECOMMISSIONING);
+ nodeManager.register(decommissioning,
+ new NodeStatus(DECOMMISSIONING, HEALTHY));
final ContainerReplica replica4 = getReplicas(
id, UNHEALTHY, sequenceID, decommissioning.getUuid(),
decommissioning);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
index 5a9fd20288..0e2070aac0 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
@@ -51,6 +51,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
@@ -164,7 +165,7 @@ public class TestRatisOverReplicationHandler {
*/
@Test
public void testOverReplicatedQuasiClosedContainerWithDifferentOrigins()
- throws IOException {
+ throws IOException, NodeNotFoundException {
container = createContainer(HddsProtos.LifeCycleState.QUASI_CLOSED,
RATIS_REPLICATION_CONFIG);
Set<ContainerReplica> replicas = createReplicas(container.containerID(),
@@ -181,6 +182,35 @@ public class TestRatisOverReplicationHandler {
testProcessing(replicas, Collections.emptyList(),
getOverReplicatedHealthResult(), 0);
+
+ /*
+ Now, introduce two UNHEALTHY replicas that share the same origin node as
+ the existing UNHEALTHY replica. They're on decommissioning and stale
+ nodes, respectively. Still no replica should be deleted, because these are
+ likely going away soon anyway.
+ */
+ replicas.add(
+ createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
+ State.UNHEALTHY, container.getNumberOfKeys(),
+ container.getUsedBytes(),
+ MockDatanodeDetails.randomDatanodeDetails(),
+ unhealthyReplica.getOriginDatanodeId()));
+ DatanodeDetails staleNode =
+ MockDatanodeDetails.randomDatanodeDetails();
+ replicas.add(
+ createContainerReplica(container.containerID(), 0, IN_SERVICE,
+ State.UNHEALTHY, container.getNumberOfKeys(),
+ container.getUsedBytes(), staleNode,
+ unhealthyReplica.getOriginDatanodeId()));
+ Mockito.when(replicationManager.getNodeStatus(eq(staleNode)))
+ .thenAnswer(invocation -> {
+ DatanodeDetails dd = invocation.getArgument(0);
+ return new NodeStatus(dd.getPersistedOpState(),
+ HddsProtos.NodeState.STALE, 0);
+ });
+
+ testProcessing(replicas, Collections.emptyList(),
+ getOverReplicatedHealthResult(), 0);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]