This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f9dfd73e99 HDDS-7761. EC: ReplicationManager - Use
placementPolicy.replicasToRemoveToFixOverreplication in EC Over replication
handler (#4166)
f9dfd73e99 is described below
commit f9dfd73e999b78b547f5a7e97213915e7f25b77d
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Jan 12 18:15:55 2023 +0000
HDDS-7761. EC: ReplicationManager - Use
placementPolicy.replicasToRemoveToFixOverreplication in EC Over replication
handler (#4166)
---
.../AbstractOverReplicationHandler.java | 14 +++
.../replication/ECOverReplicationHandler.java | 122 ++++++++++++---------
.../container/replication/ReplicationTestUtil.java | 10 ++
.../replication/TestECOverReplicationHandler.java | 113 ++++++++++++++++---
4 files changed, 197 insertions(+), 62 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
index 2dac8b2312..5d647593ef 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
@@ -80,6 +80,20 @@ public abstract class AbstractOverReplicationHandler
return isPlacementStatusActuallyEqual(currentCPS, newCPS);
}
+ /**
+ * Allow the placement policy to indicate which replicas can be removed for
+ * an over replicated container, so that the placement policy is not violated
+ * by removing them.
+ * @param replicas
+ * @param expectedCountPerUniqueReplica
+ * @return
+ */
+ protected Set<ContainerReplica> selectReplicasToRemove(
+ Set<ContainerReplica> replicas, int expectedCountPerUniqueReplica) {
+ return placementPolicy.replicasToRemoveToFixOverreplication(
+ replicas, expectedCountPerUniqueReplica);
+ }
+
/**
* Given a set of ContainerReplica, transform it to a list of DatanodeDetails
* and then check if the list meets the container placement policy.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 85295c9749..00c1a264af 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
@@ -30,12 +32,10 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@@ -74,12 +74,34 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
ContainerHealthResult result, int remainingMaintenanceRedundancy) {
ContainerInfo container = result.getContainerInfo();
+ // We are going to check for over replication, so we should filter out any
+ // replicas that are not in a HEALTHY state. This is because a replica can
+ // be healthy, stale or dead. If it is dead is will be quickly removed from
+ // scm. If it is state, there is a good chance the DN is offline and the
+ // replica will go away soon. So, if we have a container that is over
+ // replicated with a HEALTHY and STALE replica, and we decide to delete the
+ // HEALTHY one, and then the STALE ones goes away, we will lose them both.
+ // To avoid this, we will filter out any non-healthy replicas first.
+ // EcContainerReplicaCount will ignore nodes which are not IN_SERVICE for
+ // over replication checks, but we need to filter these out later in this
+ // method anyway, so it makes sense to filter them here too, to avoid a
+ // second lookup of the NodeStatus
+ Set<ContainerReplica> healthyReplicas = replicas.stream()
+ .filter(r -> {
+ NodeStatus ns = ReplicationManager.getNodeStatus(
+ r.getDatanodeDetails(), nodeManager);
+ return ns.isHealthy() && ns.getOperationalState() ==
+ HddsProtos.NodeOperationalState.IN_SERVICE;
+ })
+ .collect(Collectors.toSet());
+
final ECContainerReplicaCount replicaCount =
- new ECContainerReplicaCount(container, replicas, pendingOps,
+ new ECContainerReplicaCount(container, healthyReplicas, pendingOps,
remainingMaintenanceRedundancy);
if (!replicaCount.isOverReplicated()) {
- LOG.info("The container {} state changed and it's not in over"
- + " replication any more", container.getContainerID());
+ LOG.info("The container {} state changed and it is no longer over"
+ + " replication. Replica count: {}, healthy replica count: {}",
+ container.getContainerID(), replicas.size(), healthyReplicas.size());
return emptyMap();
}
@@ -93,10 +115,9 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
replicaCount.overReplicatedIndexes(true);
//sanity check
if (overReplicatedIndexes.size() == 0) {
- LOG.warn("The container {} with replicas {} is found over replicated " +
- "by ContainerHealthCheck, but found not over replicated by " +
- "ECContainerReplicaCount",
- container.getContainerID(), replicas);
+ LOG.warn("The container {} with replicas {} was found over replicated "
+ + "by EcContainerReplicaCount, but there are no over replicated "
+ + "indexes returned", container.getContainerID(), replicas);
return emptyMap();
}
@@ -106,48 +127,51 @@ public class ECOverReplicationHandler extends
AbstractOverReplicationHandler {
deletionInFlight.add(op.getTarget());
}
}
- Map<Integer, List<ContainerReplica>> index2replicas = new HashMap<>();
- replicas.stream()
- .filter(r -> overReplicatedIndexes.contains(r.getReplicaIndex()))
- .filter(r -> r
- .getState() == StorageContainerDatanodeProtocolProtos
- .ContainerReplicaProto.State.CLOSED)
- .filter(r -> ReplicationManager
- .getNodeStatus(r.getDatanodeDetails(), nodeManager).isHealthy())
+
+ Set<ContainerReplica> candidates = healthyReplicas.stream()
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
- .forEach(r -> {
- int index = r.getReplicaIndex();
- index2replicas.computeIfAbsent(index, k -> new LinkedList<>());
- index2replicas.get(index).add(r);
- });
-
- if (index2replicas.size() > 0) {
- final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
- final int replicationFactor =
- container.getReplicationConfig().getRequiredNodes();
- index2replicas.values().forEach(l -> {
- Iterator<ContainerReplica> it = l.iterator();
- Set<ContainerReplica> tempReplicaSet = new HashSet<>(replicas);
- while (it.hasNext() && l.size() > 1) {
- ContainerReplica r = it.next();
- if (isPlacementStatusActuallyEqualAfterRemove(
- tempReplicaSet, r, replicationFactor)) {
- DeleteContainerCommand deleteCommand =
- new DeleteContainerCommand(container.getContainerID(), true);
- deleteCommand.setReplicaIndex(r.getReplicaIndex());
- commands.put(r.getDatanodeDetails(), deleteCommand);
- it.remove();
- tempReplicaSet.remove(r);
- }
- }
- });
- if (commands.size() == 0) {
- LOG.info("With the current state of avilable replicas {}, no" +
- " commands to process due to over replication.", replicas);
+ .filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED)
+ .collect(Collectors.toSet());
+
+ Set<ContainerReplica> replicasToRemove =
+ selectReplicasToRemove(candidates, 1);
+
+ if (replicasToRemove.size() == 0) {
+ LOG.warn("The container {} is over replicated, but no replicas were "
+ + "selected to remove by the placement policy. Replicas: {}",
+ container, replicas);
+ return emptyMap();
+ }
+
+ final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ // As a sanity check, sum up the current counts of each replica index. When
+ // processing replicasToRemove, ensure that removing the replica would not
+ // drop the count of that index to zero.
+ Map<Integer, Integer> replicaIndexCounts = new HashMap<>();
+ for (ContainerReplica r : candidates) {
+ replicaIndexCounts.put(r.getReplicaIndex(),
+ replicaIndexCounts.getOrDefault(r.getReplicaIndex(), 0) + 1);
+ }
+ for (ContainerReplica r : replicasToRemove) {
+ int currentCount = replicaIndexCounts.getOrDefault(
+ r.getReplicaIndex(), 0);
+ if (currentCount < 2) {
+ LOG.warn("The replica {} selected to remove would reduce the count " +
+ "for that index to zero. Candidate Replicas: {}", r, candidates);
+ continue;
}
- return commands;
+ replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
+ DeleteContainerCommand deleteCommand =
+ new DeleteContainerCommand(container.getContainerID(), true);
+ deleteCommand.setReplicaIndex(r.getReplicaIndex());
+ commands.put(r.getDatanodeDetails(), deleteCommand);
}
- return emptyMap();
+ if (commands.size() == 0) {
+ LOG.warn("With the current state of available replicas {}, no" +
+ " commands were created to remove excess replicas.", replicas);
+ }
+ return commands;
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index f87d285158..4960f89d2f 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -198,6 +199,9 @@ public final class ReplicationTestUtil {
public static PlacementPolicy getSimpleTestPlacementPolicy(
final NodeManager nodeManager, final OzoneConfiguration conf) {
+
+ final Node rackNode = MockDatanodeDetails.randomDatanodeDetails();
+
return new SCMCommonPlacementPolicy(nodeManager, conf) {
@Override
protected List<DatanodeDetails> chooseDatanodesInternal(
@@ -216,6 +220,12 @@ public final class ReplicationTestUtil {
public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
return null;
}
+
+ @Override
+ protected Node getPlacementGroup(DatanodeDetails dn) {
+ // Make it look like a single rack cluster
+ return rackNode;
+ }
};
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index 71eb164ec0..e882374a09 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -19,16 +19,17 @@ package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -42,18 +43,20 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyList;
/**
* Tests the ECOverReplicationHandling functionality.
@@ -64,14 +67,18 @@ public class TestECOverReplicationHandler {
private NodeManager nodeManager;
private OzoneConfiguration conf;
private PlacementPolicy policy;
- private PlacementPolicy placementPolicy;
+ private DatanodeDetails staleNode;
@BeforeEach
public void setup() {
+ staleNode = null;
nodeManager = new MockNodeManager(true, 10) {
@Override
public NodeStatus getNodeStatus(DatanodeDetails dd)
throws NodeNotFoundException {
+ if (staleNode != null && dd.equals(staleNode)) {
+ return NodeStatus.inServiceStale();
+ }
return NodeStatus.inServiceHealthy();
}
};
@@ -84,10 +91,6 @@ public class TestECOverReplicationHandler {
NodeSchema[] schemas =
new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
- placementPolicy = Mockito.mock(PlacementPolicy.class);
- Mockito.when(placementPolicy.validateContainerPlacement(
- anyList(), anyInt()))
- .thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
}
@Test
@@ -96,7 +99,89 @@ public class TestECOverReplicationHandler {
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
- testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap());
+ testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+ ImmutableList.of());
+ }
+
+ @Test
+ public void testOverReplicationFixedByPendingDelete() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
+ ContainerReplica excess = ReplicationTestUtil.createContainerReplica(
+ container.containerID(), 5, IN_SERVICE,
+ ContainerReplicaProto.State.CLOSED);
+ availableReplicas.add(excess);
+ List<ContainerReplicaOp> pendingOps = new ArrayList();
+ pendingOps.add(ContainerReplicaOp.create(DELETE,
+ excess.getDatanodeDetails(), 5));
+ testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+ pendingOps);
+ }
+
+ @Test
+ public void testOverReplicationWithDecommissionIndexes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5),
+ Pair.of(DECOMMISSIONING, 5));
+ testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+ ImmutableList.of());
+ }
+
+ @Test
+ public void testOverReplicationWithStaleIndexes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
+ ContainerReplica stale = ReplicationTestUtil.createContainerReplica(
+ container.containerID(), 5, IN_SERVICE,
+ ContainerReplicaProto.State.CLOSED);
+ availableReplicas.add(stale);
+ // By setting stale node, it makes the mocked nodeManager return a stale
+ // start for it when checked.
+ staleNode = stale.getDatanodeDetails();
+ testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+ ImmutableList.of());
+ }
+
+ @Test
+ public void testOverReplicationWithOpenReplica() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
+ ContainerReplica open = ReplicationTestUtil.createContainerReplica(
+ container.containerID(), 5, IN_SERVICE,
+ ContainerReplicaProto.State.OPEN);
+ availableReplicas.add(open);
+ testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+ ImmutableList.of());
+ }
+
+ /**
+ * This test mocks the placement policy so it returns invalid results. This
+ * should not happen, but it tests that commands are not sent for the wrong
+ * replica.
+ */
+ @Test
+ public void testOverReplicationButPolicyReturnsWrongIndexes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5),
+ Pair.of(IN_SERVICE, 5));
+ ContainerReplica toReturn = ReplicationTestUtil.createContainerReplica(
+ container.containerID(), 1, IN_SERVICE,
+ ContainerReplicaProto.State.CLOSED);
+ policy = Mockito.mock(PlacementPolicy.class);
+ Mockito.when(policy.replicasToRemoveToFixOverreplication(
+ Mockito.any(), Mockito.anyInt()))
+ .thenReturn(ImmutableSet.of(toReturn));
+ testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+ ImmutableList.of());
}
@Test
@@ -109,7 +194,8 @@ public class TestECOverReplicationHandler {
testOverReplicationWithIndexes(availableReplicas,
//num of index 1 is 3, but it should be 1, so 2 excess
- new ImmutableMap.Builder<Integer, Integer>().put(1, 2).build());
+ new ImmutableMap.Builder<Integer, Integer>().put(1, 2).build(),
+ ImmutableList.of());
}
@Test
@@ -127,7 +213,7 @@ public class TestECOverReplicationHandler {
//num of index 1 is 3, but it should be 1, so 2 excess
new ImmutableMap.Builder<Integer, Integer>()
.put(1, 2).put(2, 2).put(3, 2).put(4, 1)
- .put(5, 1).build());
+ .put(5, 1).build(), ImmutableList.of());
}
/**
@@ -163,7 +249,8 @@ public class TestECOverReplicationHandler {
private void testOverReplicationWithIndexes(
Set<ContainerReplica> availableReplicas,
- Map<Integer, Integer> index2excessNum) {
+ Map<Integer, Integer> index2excessNum,
+ List<ContainerReplicaOp> pendingOps) {
ECOverReplicationHandler ecORH =
new ECOverReplicationHandler(policy, nodeManager);
ContainerHealthResult.OverReplicatedHealthResult result =
@@ -171,7 +258,7 @@ public class TestECOverReplicationHandler {
Mockito.when(result.getContainerInfo()).thenReturn(container);
Map<DatanodeDetails, SCMCommand<?>> commands = ecORH
- .processAndCreateCommands(availableReplicas, ImmutableList.of(),
+ .processAndCreateCommands(availableReplicas, pendingOps,
result, 1);
// total commands send out should be equal to the sum of all
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]