This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 79be6bf78f HDDS-9019. ECUnderReplicationHandler rejects a potential 
target if it doesn't satisfy placement policy (#5080)
79be6bf78f is described below

commit 79be6bf78f752788b4012c33822ea1097e3ff9ad
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Tue Jul 18 16:12:08 2023 +0530

    HDDS-9019. ECUnderReplicationHandler rejects a potential target if it 
doesn't satisfy placement policy (#5080)
---
 .../replication/ECUnderReplicationHandler.java     | 136 +++++++++++++--------
 .../replication/TestECUnderReplicationHandler.java |   8 +-
 2 files changed, 92 insertions(+), 52 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 2afd3c53db..daae24f7f2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -76,20 +77,16 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     this.metrics = replicationManager.getMetrics();
   }
 
-  private boolean validatePlacement(List<DatanodeDetails> replicaNodes,
-                                    List<DatanodeDetails> selectedNodes) {
+  private ContainerPlacementStatus validatePlacement(
+      ContainerInfo container,
+      List<DatanodeDetails> replicaNodes,
+      List<DatanodeDetails> selectedNodes) {
+    LOG.trace("Validating placement policy for container {}. Available " +
+            "replica nodes: {}. Selected target nodes: {}.",
+        container.containerID(), replicaNodes, selectedNodes);
     List<DatanodeDetails> nodes = new ArrayList<>(replicaNodes);
     nodes.addAll(selectedNodes);
-    boolean placementStatus = containerPlacement
-            .validateContainerPlacement(nodes, nodes.size())
-            .isPolicySatisfied();
-    if (!placementStatus) {
-      LOG.warn("Selected Nodes does not satisfy placement policy: {}. " +
-              "Selected nodes: {}. Existing Replica Nodes: {}.",
-              containerPlacement.getClass().getName(),
-              selectedNodes, replicaNodes);
-    }
-    return placementStatus;
+    return containerPlacement.validateContainerPlacement(nodes, nodes.size());
   }
 
   /**
@@ -111,7 +108,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       final ContainerHealthResult result,
       final int remainingMaintenanceRedundancy) throws IOException {
     ContainerInfo container = result.getContainerInfo();
-    LOG.debug("Handling under-replicated EC container: {}", container);
+    LOG.debug("Handling under-replicated container: {}", container);
 
     final ECContainerReplicaCount replicaCount =
         new ECContainerReplicaCount(container, replicas, pendingOps,
@@ -127,6 +124,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
           container.getContainerID(), replicaCount.getReplicas());
       return 0;
     }
+    LOG.debug("Under-replicated container {} currently has replicas: {}.",
+        container.containerID(), replicas);
 
     ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
         ReplicationManagerUtil.getExcludedAndUsedNodes(
@@ -282,6 +281,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     ECReplicationConfig repConfig =
         (ECReplicationConfig)container.getReplicationConfig();
     List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
+    LOG.debug("Processing missing indexes {} for container {}.", 
missingIndexes,
+        container.containerID());
     final int expectedTargetCount = missingIndexes.size();
     boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity();
     if (expectedTargetCount == 0) {
@@ -334,8 +335,19 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       if (targetCount < expectedTargetCount) {
         missingIndexes.subList(targetCount, expectedTargetCount).clear();
       }
-      if (0 < targetCount &&
-          validatePlacement(availableSourceNodes, selectedDatanodes)) {
+
+      ContainerPlacementStatus placementStatusWithSelectedTargets =
+          validatePlacement(container, availableSourceNodes, 
selectedDatanodes);
+      if (!placementStatusWithSelectedTargets.isPolicySatisfied()) {
+        LOG.debug("Target nodes + existing nodes for EC container {}" +
+                " will not satisfy placement policy {}. Reason: {}. Selected" +
+                " nodes: {}. Available source nodes: {}. Resuming " +
+                "reconstruction regardless.",
+            container.containerID(), containerPlacement.getClass().getName(),
+            placementStatusWithSelectedTargets.misReplicatedReason(),
+            selectedDatanodes, availableSourceNodes);
+      }
+      if (0 < targetCount) {
         usedNodes.addAll(selectedDatanodes);
         // TODO - what are we adding all the selected nodes to available
         //        sources?
@@ -387,6 +399,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
               + " {}. Available sources are: {}", container.containerID(),
           repConfig.getData(), sources.size(), sources);
     }
+    LOG.trace("Sent {} commands for container {}.", commandsSent,
+        container.containerID());
     return commandsSent;
   }
 
@@ -416,47 +430,60 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
     int commandsSent = 0;
     if (decomIndexes.size() > 0) {
+      LOG.debug("Processing decommissioning indexes {} for container {}.",
+          decomIndexes, container.containerID());
       final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
           container, decomIndexes.size(), usedNodes, excludedNodes);
 
-      if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
-        usedNodes.addAll(selectedDatanodes);
-        Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
-        // In this case we need to do one to one copy.
-        CommandTargetOverloadedException overloadedException = null;
-        for (Integer decomIndex : decomIndexes) {
-          Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
-          if (source == null) {
-            LOG.warn("Cannot find source replica for decommissioning index " +
-                    "{} in container {}", decomIndex, container.containerID());
-            continue;
-          }
-          ContainerReplica sourceReplica = source.getLeft();
-          if (!iterator.hasNext()) {
-            LOG.warn("Couldn't find enough targets. Available source"
-                + " nodes: {}, the target nodes: {}, excluded nodes: {},"
-                + " usedNodes: {}, and the decommission indexes: {}",
-                sources.values().stream()
-                    .map(Pair::getLeft).collect(Collectors.toSet()),
-                selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
-            break;
-          }
-          try {
-            createReplicateCommand(
-                container, iterator, sourceReplica, replicaCount);
-            commandsSent++;
-          } catch (CommandTargetOverloadedException e) {
-            LOG.debug("Unable to send Replicate command for container {}" +
-                " index {} because the source node {} is overloaded.",
-                container.getContainerID(), sourceReplica.getReplicaIndex(),
-                sourceReplica.getDatanodeDetails());
-            overloadedException = e;
-          }
+      ContainerPlacementStatus placementStatusWithSelectedTargets =
+          validatePlacement(container, availableSourceNodes, 
selectedDatanodes);
+      if (!placementStatusWithSelectedTargets.isPolicySatisfied()) {
+        LOG.debug("Target nodes + existing nodes for EC container {}" +
+                " will not satisfy placement policy {}. Reason: {}. Selected" +
+                " nodes: {}. Available source nodes: {}. Resuming recovery " +
+                "regardless.",
+            container.containerID(), containerPlacement.getClass().getName(),
+            placementStatusWithSelectedTargets.misReplicatedReason(),
+            selectedDatanodes, availableSourceNodes);
+      }
+
+      usedNodes.addAll(selectedDatanodes);
+      Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
+      // In this case we need to do one to one copy.
+      CommandTargetOverloadedException overloadedException = null;
+      for (Integer decomIndex : decomIndexes) {
+        Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
+        if (source == null) {
+          LOG.warn("Cannot find source replica for decommissioning index " +
+                  "{} in container {}", decomIndex, container.containerID());
+          continue;
         }
-        if (overloadedException != null) {
-          throw overloadedException;
+        ContainerReplica sourceReplica = source.getLeft();
+        if (!iterator.hasNext()) {
+          LOG.warn("Couldn't find enough targets. Available source"
+              + " nodes: {}, the target nodes: {}, excluded nodes: {},"
+              + " usedNodes: {}, and the decommission indexes: {}",
+              sources.values().stream()
+                  .map(Pair::getLeft).collect(Collectors.toSet()),
+              selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
+          break;
         }
+        try {
+          createReplicateCommand(
+              container, iterator, sourceReplica, replicaCount);
+          commandsSent++;
+        } catch (CommandTargetOverloadedException e) {
+          LOG.debug("Unable to send Replicate command for container {}" +
+              " index {} because the source node {} is overloaded.",
+              container.getContainerID(), sourceReplica.getReplicaIndex(),
+              sourceReplica.getDatanodeDetails());
+          overloadedException = e;
+        }
+      }
+      if (overloadedException != null) {
+        throw overloadedException;
       }
+
       if (selectedDatanodes.size() != decomIndexes.size()) {
         LOG.debug("Insufficient nodes were returned from the placement policy" 
+
             " to fully replicate the decommission indexes for container {}." +
@@ -467,6 +494,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
             selectedDatanodes.size());
       }
     }
+    LOG.trace("Sent {} commands for container {}.", commandsSent,
+        container.containerID());
     return commandsSent;
   }
 
@@ -489,12 +518,17 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
     }
 
     ContainerInfo container = replicaCount.getContainer();
+    LOG.debug("Processing maintenance indexes {} for container {}.",
+        maintIndexes, container.containerID());
     // this many maintenance replicas need another copy
     int additionalMaintenanceCopiesNeeded =
         replicaCount.additionalMaintenanceCopiesNeeded(true);
     if (additionalMaintenanceCopiesNeeded == 0) {
       return 0;
     }
+    LOG.debug("Number of maintenance replicas of container {} that need " +
+            "additional copies: {}.", container.containerID(),
+        additionalMaintenanceCopiesNeeded);
     List<DatanodeDetails> targets = getTargetDatanodes(
         container, maintIndexes.size(), usedNodes, excludedNodes
     );
@@ -551,6 +585,8 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
       throw new InsufficientDatanodesException(maintIndexes.size(),
           targets.size());
     }
+    LOG.trace("Sent {} commands for container {}.", commandsSent,
+        container.containerID());
     return commandsSent;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index be95bcee60..e65485709d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -481,6 +481,10 @@ public class TestECUnderReplicationHandler {
         availableReplicas, 1, 2, policy);
   }
 
+  /**
+   * The expectation is that an under replicated container should recover
+   * even if datanodes hosting new replicas don't satisfy placement policy.
+   */
   @Test
   public void testUnderReplicationWithInvalidPlacement()
           throws IOException {
@@ -505,8 +509,8 @@ public class TestECUnderReplicationHandler {
               .allMatch(dns::contains));
           return mockedContainerPlacementStatus;
         });
-    testUnderReplicationWithMissingIndexes(emptyList(),
-            availableReplicas, 0, 0, mockedPolicy);
+    testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+            availableReplicas, 2, 0, mockedPolicy);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to