This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f9dfd73e99 HDDS-7761. EC: ReplicationManager - Use 
placementPolicy.replicasToRemoveToFixOverreplication in EC Over replication 
handler (#4166)
f9dfd73e99 is described below

commit f9dfd73e999b78b547f5a7e97213915e7f25b77d
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Jan 12 18:15:55 2023 +0000

    HDDS-7761. EC: ReplicationManager - Use 
placementPolicy.replicasToRemoveToFixOverreplication in EC Over replication 
handler (#4166)
---
 .../AbstractOverReplicationHandler.java            |  14 +++
 .../replication/ECOverReplicationHandler.java      | 122 ++++++++++++---------
 .../container/replication/ReplicationTestUtil.java |  10 ++
 .../replication/TestECOverReplicationHandler.java  | 113 ++++++++++++++++---
 4 files changed, 197 insertions(+), 62 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
index 2dac8b2312..5d647593ef 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java
@@ -80,6 +80,20 @@ public abstract class AbstractOverReplicationHandler
     return isPlacementStatusActuallyEqual(currentCPS, newCPS);
   }
 
+  /**
+   * Allow the placement policy to indicate which replicas can be removed for
+   * an over replicated container, so that the placement policy is not violated
+   * by removing them.
+   * @param replicas
+   * @param expectedCountPerUniqueReplica
+   * @return
+   */
+  protected Set<ContainerReplica> selectReplicasToRemove(
+      Set<ContainerReplica> replicas, int expectedCountPerUniqueReplica) {
+    return placementPolicy.replicasToRemoveToFixOverreplication(
+        replicas, expectedCountPerUniqueReplica);
+  }
+
   /**
    * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
    * and then check if the list meets the container placement policy.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 85295c9749..00c1a264af 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -18,11 +18,13 @@
 package org.apache.hadoop.hdds.scm.container.replication;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
@@ -30,12 +32,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
 
@@ -74,12 +74,34 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
       ContainerHealthResult result, int remainingMaintenanceRedundancy) {
     ContainerInfo container = result.getContainerInfo();
 
+    // We are going to check for over replication, so we should filter out any
+    // replicas that are not in a HEALTHY state. This is because a replica can
+    // be healthy, stale or dead. If it is dead is will be quickly removed from
+    // scm. If it is state, there is a good chance the DN is offline and the
+    // replica will go away soon. So, if we have a container that is over
+    // replicated with a HEALTHY and STALE replica, and we decide to delete the
+    // HEALTHY one, and then the STALE ones goes away, we will lose them both.
+    // To avoid this, we will filter out any non-healthy replicas first.
+    // EcContainerReplicaCount will ignore nodes which are not IN_SERVICE for
+    // over replication checks, but we need to filter these out later in this
+    // method anyway, so it makes sense to filter them here too, to avoid a
+    // second lookup of the NodeStatus
+    Set<ContainerReplica> healthyReplicas = replicas.stream()
+        .filter(r -> {
+          NodeStatus ns = ReplicationManager.getNodeStatus(
+              r.getDatanodeDetails(), nodeManager);
+          return ns.isHealthy() && ns.getOperationalState() ==
+              HddsProtos.NodeOperationalState.IN_SERVICE;
+        })
+        .collect(Collectors.toSet());
+
     final ECContainerReplicaCount replicaCount =
-        new ECContainerReplicaCount(container, replicas, pendingOps,
+        new ECContainerReplicaCount(container, healthyReplicas, pendingOps,
             remainingMaintenanceRedundancy);
     if (!replicaCount.isOverReplicated()) {
-      LOG.info("The container {} state changed and it's not in over"
-              + " replication any more", container.getContainerID());
+      LOG.info("The container {} state changed and it is no longer over"
+              + " replication. Replica count: {}, healthy replica count: {}",
+          container.getContainerID(), replicas.size(), healthyReplicas.size());
       return emptyMap();
     }
 
@@ -93,10 +115,9 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
         replicaCount.overReplicatedIndexes(true);
     //sanity check
     if (overReplicatedIndexes.size() == 0) {
-      LOG.warn("The container {} with replicas {} is found over replicated " +
-              "by ContainerHealthCheck, but found not over replicated by " +
-              "ECContainerReplicaCount",
-          container.getContainerID(), replicas);
+      LOG.warn("The container {} with replicas {} was found over replicated "
+          + "by EcContainerReplicaCount, but there are no over replicated "
+          + "indexes returned", container.getContainerID(), replicas);
       return emptyMap();
     }
 
@@ -106,48 +127,51 @@ public class ECOverReplicationHandler extends 
AbstractOverReplicationHandler {
         deletionInFlight.add(op.getTarget());
       }
     }
-    Map<Integer, List<ContainerReplica>> index2replicas = new HashMap<>();
-    replicas.stream()
-        .filter(r -> overReplicatedIndexes.contains(r.getReplicaIndex()))
-        .filter(r -> r
-            .getState() == StorageContainerDatanodeProtocolProtos
-            .ContainerReplicaProto.State.CLOSED)
-        .filter(r -> ReplicationManager
-            .getNodeStatus(r.getDatanodeDetails(), nodeManager).isHealthy())
+
+    Set<ContainerReplica> candidates = healthyReplicas.stream()
         .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
-        .forEach(r -> {
-          int index = r.getReplicaIndex();
-          index2replicas.computeIfAbsent(index, k -> new LinkedList<>());
-          index2replicas.get(index).add(r);
-        });
-
-    if (index2replicas.size() > 0) {
-      final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
-      final int replicationFactor =
-          container.getReplicationConfig().getRequiredNodes();
-      index2replicas.values().forEach(l -> {
-        Iterator<ContainerReplica> it = l.iterator();
-        Set<ContainerReplica> tempReplicaSet = new HashSet<>(replicas);
-        while (it.hasNext() && l.size() > 1) {
-          ContainerReplica r = it.next();
-          if (isPlacementStatusActuallyEqualAfterRemove(
-              tempReplicaSet, r, replicationFactor)) {
-            DeleteContainerCommand deleteCommand =
-                new DeleteContainerCommand(container.getContainerID(), true);
-            deleteCommand.setReplicaIndex(r.getReplicaIndex());
-            commands.put(r.getDatanodeDetails(), deleteCommand);
-            it.remove();
-            tempReplicaSet.remove(r);
-          }
-        }
-      });
-      if (commands.size() == 0) {
-        LOG.info("With the current state of avilable replicas {}, no" +
-            " commands to process due to over replication.", replicas);
+        .filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos
+            .ContainerReplicaProto.State.CLOSED)
+        .collect(Collectors.toSet());
+
+    Set<ContainerReplica> replicasToRemove =
+        selectReplicasToRemove(candidates, 1);
+
+    if (replicasToRemove.size() == 0) {
+      LOG.warn("The container {} is over replicated, but no replicas were "
+          + "selected to remove by the placement policy. Replicas: {}",
+          container, replicas);
+      return emptyMap();
+    }
+
+    final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    // As a sanity check, sum up the current counts of each replica index. When
+    // processing replicasToRemove, ensure that removing the replica would not
+    // drop the count of that index to zero.
+    Map<Integer, Integer> replicaIndexCounts = new HashMap<>();
+    for (ContainerReplica r : candidates) {
+      replicaIndexCounts.put(r.getReplicaIndex(),
+          replicaIndexCounts.getOrDefault(r.getReplicaIndex(), 0) + 1);
+    }
+    for (ContainerReplica r : replicasToRemove) {
+      int currentCount = replicaIndexCounts.getOrDefault(
+          r.getReplicaIndex(), 0);
+      if (currentCount < 2) {
+        LOG.warn("The replica {} selected to remove would reduce the count " +
+            "for that index to zero. Candidate Replicas: {}", r, candidates);
+        continue;
       }
-      return commands;
+      replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
+      DeleteContainerCommand deleteCommand =
+          new DeleteContainerCommand(container.getContainerID(), true);
+      deleteCommand.setReplicaIndex(r.getReplicaIndex());
+      commands.put(r.getDatanodeDetails(), deleteCommand);
     }
 
-    return emptyMap();
+    if (commands.size() == 0) {
+      LOG.warn("With the current state of available replicas {}, no" +
+          " commands were created to remove excess replicas.", replicas);
+    }
+    return commands;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index f87d285158..4960f89d2f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 
@@ -198,6 +199,9 @@ public final class ReplicationTestUtil {
 
   public static PlacementPolicy getSimpleTestPlacementPolicy(
       final NodeManager nodeManager, final OzoneConfiguration conf) {
+
+    final Node rackNode = MockDatanodeDetails.randomDatanodeDetails();
+
     return new SCMCommonPlacementPolicy(nodeManager, conf) {
       @Override
       protected List<DatanodeDetails> chooseDatanodesInternal(
@@ -216,6 +220,12 @@ public final class ReplicationTestUtil {
       public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
         return null;
       }
+
+      @Override
+      protected Node getPlacementGroup(DatanodeDetails dn) {
+        // Make it look like a single rack cluster
+        return rackNode;
+      }
     };
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index 71eb164ec0..e882374a09 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -19,16 +19,17 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
 import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -42,18 +43,20 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyList;
 
 /**
  * Tests the ECOverReplicationHandling functionality.
@@ -64,14 +67,18 @@ public class TestECOverReplicationHandler {
   private NodeManager nodeManager;
   private OzoneConfiguration conf;
   private PlacementPolicy policy;
-  private PlacementPolicy placementPolicy;
+  private DatanodeDetails staleNode;
 
   @BeforeEach
   public void setup() {
+    staleNode = null;
     nodeManager = new MockNodeManager(true, 10) {
       @Override
       public NodeStatus getNodeStatus(DatanodeDetails dd)
           throws NodeNotFoundException {
+        if (staleNode != null && dd.equals(staleNode)) {
+          return NodeStatus.inServiceStale();
+        }
         return NodeStatus.inServiceHealthy();
       }
     };
@@ -84,10 +91,6 @@ public class TestECOverReplicationHandler {
     NodeSchema[] schemas =
         new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
     NodeSchemaManager.getInstance().init(schemas, true);
-    placementPolicy = Mockito.mock(PlacementPolicy.class);
-    Mockito.when(placementPolicy.validateContainerPlacement(
-        anyList(), anyInt()))
-        .thenReturn(new ContainerPlacementStatusDefault(2, 2, 3));
   }
 
   @Test
@@ -96,7 +99,89 @@ public class TestECOverReplicationHandler {
         .createReplicas(Pair.of(IN_SERVICE, 1),
             Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
             Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
-    testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap());
+    testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+        ImmutableList.of());
+  }
+
+  @Test
+  public void testOverReplicationFixedByPendingDelete() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
+    ContainerReplica excess = ReplicationTestUtil.createContainerReplica(
+        container.containerID(), 5, IN_SERVICE,
+        ContainerReplicaProto.State.CLOSED);
+    availableReplicas.add(excess);
+    List<ContainerReplicaOp> pendingOps = new ArrayList();
+    pendingOps.add(ContainerReplicaOp.create(DELETE,
+        excess.getDatanodeDetails(), 5));
+    testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+        pendingOps);
+  }
+
+  @Test
+  public void testOverReplicationWithDecommissionIndexes() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5),
+            Pair.of(DECOMMISSIONING, 5));
+    testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+        ImmutableList.of());
+  }
+
+  @Test
+  public void testOverReplicationWithStaleIndexes() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
+    ContainerReplica stale = ReplicationTestUtil.createContainerReplica(
+        container.containerID(), 5, IN_SERVICE,
+        ContainerReplicaProto.State.CLOSED);
+    availableReplicas.add(stale);
+    // By setting stale node, it makes the mocked nodeManager return a stale
+    // start for it when checked.
+    staleNode = stale.getDatanodeDetails();
+    testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+        ImmutableList.of());
+  }
+
+  @Test
+  public void testOverReplicationWithOpenReplica() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
+    ContainerReplica open = ReplicationTestUtil.createContainerReplica(
+        container.containerID(), 5, IN_SERVICE,
+        ContainerReplicaProto.State.OPEN);
+    availableReplicas.add(open);
+    testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+        ImmutableList.of());
+  }
+
+  /**
+   * This test mocks the placement policy so it returns invalid results. This
+   * should not happen, but it tests that commands are not sent for the wrong
+   * replica.
+   */
+  @Test
+  public void testOverReplicationButPolicyReturnsWrongIndexes() {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5),
+            Pair.of(IN_SERVICE, 5));
+    ContainerReplica toReturn = ReplicationTestUtil.createContainerReplica(
+        container.containerID(), 1, IN_SERVICE,
+        ContainerReplicaProto.State.CLOSED);
+    policy = Mockito.mock(PlacementPolicy.class);
+    Mockito.when(policy.replicasToRemoveToFixOverreplication(
+        Mockito.any(), Mockito.anyInt()))
+        .thenReturn(ImmutableSet.of(toReturn));
+    testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(),
+        ImmutableList.of());
   }
 
   @Test
@@ -109,7 +194,8 @@ public class TestECOverReplicationHandler {
 
     testOverReplicationWithIndexes(availableReplicas,
         //num of index 1 is 3, but it should be 1, so 2 excess
-        new ImmutableMap.Builder<Integer, Integer>().put(1, 2).build());
+        new ImmutableMap.Builder<Integer, Integer>().put(1, 2).build(),
+            ImmutableList.of());
   }
 
   @Test
@@ -127,7 +213,7 @@ public class TestECOverReplicationHandler {
         //num of index 1 is 3, but it should be 1, so 2 excess
         new ImmutableMap.Builder<Integer, Integer>()
             .put(1, 2).put(2, 2).put(3, 2).put(4, 1)
-            .put(5, 1).build());
+            .put(5, 1).build(), ImmutableList.of());
   }
 
   /**
@@ -163,7 +249,8 @@ public class TestECOverReplicationHandler {
 
   private void testOverReplicationWithIndexes(
       Set<ContainerReplica> availableReplicas,
-      Map<Integer, Integer> index2excessNum) {
+      Map<Integer, Integer> index2excessNum,
+      List<ContainerReplicaOp> pendingOps) {
     ECOverReplicationHandler ecORH =
         new ECOverReplicationHandler(policy, nodeManager);
     ContainerHealthResult.OverReplicatedHealthResult result =
@@ -171,7 +258,7 @@ public class TestECOverReplicationHandler {
     Mockito.when(result.getContainerInfo()).thenReturn(container);
 
     Map<DatanodeDetails, SCMCommand<?>> commands = ecORH
-        .processAndCreateCommands(availableReplicas, ImmutableList.of(),
+        .processAndCreateCommands(availableReplicas, pendingOps,
             result, 1);
 
     // total commands send out should be equal to the sum of all


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to