This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8967738e93 HDDS-7492. Extend Placement Policy Interface to select 
mis-replicated replicas to copy (#4006)
8967738e93 is described below

commit 8967738e93e174707dc6ad7ab7da7b0f904d09b8
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Dec 8 05:45:42 2022 -0700

    HDDS-7492. Extend Placement Policy Interface to select mis-replicated 
replicas to copy (#4006)
---
 .../apache/hadoop/hdds/scm/PlacementPolicy.java    |  15 +-
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  | 103 +++++++--
 .../ContainerPlacementStatusDefault.java           |  36 +++-
 .../algorithms/SCMContainerPlacementRackAware.java |   8 +
 .../SCMContainerPlacementRackScatter.java          |  77 ++++---
 .../algorithms/SCMContainerPlacementRandom.java    |   3 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   8 +
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |  32 ++-
 .../hdds/scm/TestSCMCommonPlacementPolicy.java     | 240 ++++++++++++++++++++-
 .../algorithms/TestContainerPlacementFactory.java  |  12 +-
 .../TestSCMContainerPlacementRackScatter.java      |  56 +++--
 .../org/apache/ozone/test/GenericTestUtils.java    |  10 +
 .../ozone/recon/fsck/TestContainerHealthTask.java  |  10 +-
 13 files changed, 521 insertions(+), 89 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
index b240e5c3b7..af41a15740 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
@@ -22,12 +22,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 /**
  * A PlacementPolicy support choosing datanodes to build
  * pipelines or containers with specified constraints.
  */
-public interface PlacementPolicy {
+public interface PlacementPolicy<Replica> {
 
   default List<DatanodeDetails> chooseDatanodes(
           List<DatanodeDetails> excludedNodes,
@@ -60,9 +61,17 @@ public interface PlacementPolicy {
    * Given a list of datanode and the number of replicas required, return
    * a PlacementPolicyStatus object indicating if the container meets the
    * placement policy - ie is it on the correct number of racks, etc.
-   * @param dns List of datanodes holding a replica of the container
+   * @param dns List of replica holding a replica of the container
    * @param replicas The expected number of replicas
    */
   ContainerPlacementStatus validateContainerPlacement(
-      List<DatanodeDetails> dns, int replicas);
+          List<DatanodeDetails> dns, int replicas);
+
+  /**
+   * Given a set of replicas of a container which are
+   * neither over underreplicated nor overreplicated,
+   * return a set of replicas to copy to another node to fix misreplication.
+   * @param replicas
+   */
+  Set<Replica> replicasToCopyToFixMisreplication(Set<Replica> replicas);
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index 5f7e944235..535b03508d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -18,35 +18,40 @@
 package org.apache.hadoop.hdds.scm;
 
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.stream.Collectors;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
  * This policy implements a set of invariants which are common
  * for all basic placement policies, acts as the repository of helper
  * functions which are common to placement policies.
  */
-public abstract class SCMCommonPlacementPolicy implements PlacementPolicy {
+public abstract class SCMCommonPlacementPolicy implements
+        PlacementPolicy<ContainerReplica> {
   @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(SCMCommonPlacementPolicy.class);
@@ -346,6 +351,22 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
     return 1;
   }
 
+  /**
+   * Default implementation to return the max number of replicas per rack.
+   * For simple policies that are not rack aware
+   * we return numReplicas, from this default implementation.
+   *
+   * @param numReplicas - The desired replica counts
+   * @param numberOfRacks - The desired number of racks
+   * @return The max number of replicas per rack
+   */
+  protected int getMaxReplicasPerRack(int numReplicas, int numberOfRacks) {
+    return numReplicas / numberOfRacks
+            + Math.min(numReplicas % numberOfRacks, 1);
+  }
+
+
+
   /**
    * This default implementation handles rack aware policies and non rack
    * aware policies. If a future placement policy needs to check more than 
racks
@@ -363,6 +384,7 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
   public ContainerPlacementStatus validateContainerPlacement(
       List<DatanodeDetails> dns, int replicas) {
     NetworkTopology topology = nodeManager.getClusterNetworkTopologyMap();
+    // We have a network topology so calculate if it is satisfied or not.
     int requiredRacks = getRequiredRackCount(replicas);
     if (topology == null || replicas == 1 || requiredRacks == 1) {
       if (dns.size() > 0) {
@@ -372,22 +394,22 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
         return invalidPlacement;
       }
     }
-    // We have a network topology so calculate if it is satisfied or not.
-    int numRacks = 1;
+    Map<Node, Long> currentRackCount = dns.stream()
+            .collect(Collectors.groupingBy(this::getPlacementGroup,
+                    Collectors.counting()));
     final int maxLevel = topology.getMaxLevel();
     // The leaf nodes are all at max level, so the number of nodes at
     // leafLevel - 1 is the rack count
-    numRacks = topology.getNumOfNodes(maxLevel - 1);
-    final long currentRackCount = dns.stream()
-        .map(d -> topology.getAncestor(d, 1))
-        .distinct()
-        .count();
-
+    int numRacks = topology.getNumOfNodes(maxLevel - 1);
     if (replicas < requiredRacks) {
       requiredRacks = replicas;
     }
+    int maxReplicasPerRack = getMaxReplicasPerRack(replicas,
+            Math.min(requiredRacks, numRacks));
     return new ContainerPlacementStatusDefault(
-        (int)currentRackCount, requiredRacks, numRacks);
+        currentRackCount.size(), requiredRacks, numRacks, maxReplicasPerRack,
+            currentRackCount.values().stream().map(Long::intValue)
+                    .collect(Collectors.toList()));
   }
 
   /**
@@ -426,4 +448,47 @@ public abstract class SCMCommonPlacementPolicy implements 
PlacementPolicy {
     }
     return false;
   }
+
+  /**
+   * Given a set of replicas of a container which are
+   * neither over underreplicated nor overreplicated,
+   * return a set of replicas to copy to another node to fix misreplication.
+   * @param replicas
+   */
+  @Override
+  public Set<ContainerReplica> replicasToCopyToFixMisreplication(
+         Set<ContainerReplica> replicas) {
+    Map<Node, List<ContainerReplica>> placementGroupReplicaIdMap
+            = replicas.stream().collect(Collectors.groupingBy(replica ->
+            this.getPlacementGroup(replica.getDatanodeDetails())));
+
+    int totalNumberOfReplicas = replicas.size();
+    int requiredNumberOfPlacementGroups =
+            getRequiredRackCount(totalNumberOfReplicas);
+    Set<ContainerReplica> copyReplicaSet = Sets.newHashSet();
+    List<List<ContainerReplica>> replicaSet = placementGroupReplicaIdMap
+            .values().stream()
+            .sorted((o1, o2) -> Integer.compare(o2.size(), o1.size()))
+            .limit(requiredNumberOfPlacementGroups)
+            .collect(Collectors.toList());
+    for (List<ContainerReplica> replicaList: replicaSet) {
+      int maxReplicasPerPlacementGroup = getMaxReplicasPerRack(
+              totalNumberOfReplicas, requiredNumberOfPlacementGroups);
+      int numberOfReplicasToBeCopied = Math.max(0,
+              replicaList.size() - maxReplicasPerPlacementGroup);
+      totalNumberOfReplicas -= maxReplicasPerPlacementGroup;
+      requiredNumberOfPlacementGroups -= 1;
+      if (numberOfReplicasToBeCopied > 0) {
+        List<ContainerReplica> replicasToBeCopied = replicaList.stream()
+                .limit(numberOfReplicasToBeCopied)
+                .collect(Collectors.toList());
+        copyReplicaSet.addAll(replicasToBeCopied);
+      }
+    }
+    return copyReplicaSet;
+  }
+
+  protected Node getPlacementGroup(DatanodeDetails dn) {
+    return nodeManager.getClusterNetworkTopologyMap().getAncestor(dn, 1);
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
index 3fdcd2f401..a0fb5db92d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementStatusDefault.java
@@ -18,6 +18,9 @@ package 
org.apache.hadoop.hdds.scm.container.placement.algorithms;
 
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  *  Simple Status object to check if a container is replicated across enough
  *  racks.
@@ -29,16 +32,32 @@ public class ContainerPlacementStatusDefault
   private final int currentRacks;
   private final int totalRacks;
 
+  private final int maxReplicasPerRack;
+  private final List<Integer> rackReplicaCnts;
+
+
   public ContainerPlacementStatusDefault(int currentRacks, int requiredRacks,
-      int totalRacks) {
+      int totalRacks, int maxReplicasPerRack, List<Integer> rackReplicaCnts) {
     this.requiredRacks = requiredRacks;
     this.currentRacks = currentRacks;
     this.totalRacks = totalRacks;
+    this.maxReplicasPerRack = maxReplicasPerRack;
+    this.rackReplicaCnts = rackReplicaCnts;
+  }
+
+  public ContainerPlacementStatusDefault(int requiredRacks, int currentRacks,
+      int totalRacks) {
+    this(requiredRacks, currentRacks, totalRacks, 1,
+         currentRacks == 0 ? Collections.emptyList()
+                 : Collections.nCopies(currentRacks, 1));
   }
 
   @Override
   public boolean isPolicySatisfied() {
-    return currentRacks >= totalRacks || currentRacks >= requiredRacks;
+    if (currentRacks < Math.min(totalRacks, requiredRacks)) {
+      return false;
+    }
+    return rackReplicaCnts.stream().allMatch(cnt -> cnt <= maxReplicasPerRack);
   }
 
   @Override
@@ -46,8 +65,13 @@ public class ContainerPlacementStatusDefault
     if (isPolicySatisfied()) {
       return null;
     }
-    return "The container is mis-replicated as it is on " + currentRacks +
-        " racks but should be on " + requiredRacks + " racks.";
+    if (currentRacks < Math.min(requiredRacks, totalRacks)) {
+      return "The container is mis-replicated as it is on " + currentRacks +
+              " racks but should be on " + requiredRacks + " racks.";
+    }
+    return "The container is mis-replicated as max number of replicas per rack 
"
+            + "is " + maxReplicasPerRack + " but number of replicas per rack" +
+            " are " + rackReplicaCnts.toString();
   }
 
   @Override
@@ -55,7 +79,9 @@ public class ContainerPlacementStatusDefault
     if (isPolicySatisfied()) {
       return 0;
     }
-    return requiredRacks - currentRacks;
+    return Math.max(requiredRacks - currentRacks,
+            rackReplicaCnts.stream().mapToInt(
+                    cnt -> Math.max(maxReplicasPerRack - cnt, 0)).sum());
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
index 4ee7408fbb..4f07024f16 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
@@ -376,6 +376,14 @@ public final class SCMContainerPlacementRackAware
     }
   }
 
+  @Override
+  protected int getMaxReplicasPerRack(int numReplicas, int numberOfRacks) {
+    if (numberOfRacks == 1) {
+      return numReplicas;
+    }
+    return Math.max(numReplicas - 1, 1);
+  }
+
   @Override
   protected int getRequiredRackCount(int numReplicas) {
     return REQUIRED_RACKS;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index eff2dc86c4..2f86df8708 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container.placement.algorithms;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
@@ -35,7 +36,10 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -78,13 +82,19 @@ public final class SCMContainerPlacementRackScatter
   }
 
   public Set<DatanodeDetails> chooseNodesFromRacks(List<Node> racks,
-          List<Node> unavailableNodes,
-          List<DatanodeDetails> mutableFavoredNodes,
-          int nodesRequired, final long metadataSizeRequired,
-          final long dataSizeRequired, int maxOuterLoopIterations) {
+       List<Node> unavailableNodes,
+       List<DatanodeDetails> mutableFavoredNodes,
+       int nodesRequired, final Pair<Long, Long> metadatasizeDatasizePair,
+       int maxOuterLoopIterations, final Pair<Map<Node, Integer>, Integer>
+       rackCntMapMaxReplicaPerRackPair) {
     if (nodesRequired <= 0) {
       return Collections.emptySet();
     }
+    final long metadataSizeRequired = metadatasizeDatasizePair.getKey();
+    final long dataSizeRequired = metadatasizeDatasizePair.getValue();
+    final Map<Node, Integer> rackCntMap =
+            rackCntMapMaxReplicaPerRackPair.getKey();
+    final int maxReplicasPerRack = rackCntMapMaxReplicaPerRackPair.getValue();
     List<Node> toChooseRacks = new LinkedList<>();
     Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>();
     Set<Node> skippedRacks = new HashSet<>();
@@ -97,8 +107,10 @@ public final class SCMContainerPlacementRackScatter
       int chosenListSize = chosenNodes.size();
 
       // Refill toChooseRacks, we put skippedRacks in front of toChooseRacks
-      // for a even distribution
-      toChooseRacks.addAll(racks);
+      // for an even distribution
+      toChooseRacks.addAll(racks.stream()
+              .filter(rack -> rackCntMap.getOrDefault(rack, 0)
+                      < maxReplicasPerRack).collect(Collectors.toList()));
       if (!skippedRacks.isEmpty()) {
         toChooseRacks.removeAll(skippedRacks);
         toChooseRacks.addAll(0, skippedRacks);
@@ -111,6 +123,7 @@ public final class SCMContainerPlacementRackScatter
           Node curRack = getRackOfDatanodeDetails(favoredNode);
           if (toChooseRacks.contains(curRack)) {
             chosenNodes.add(favoredNode);
+            rackCntMap.merge(curRack, 1, Math::addExact);
             toChooseRacks.remove(curRack);
             chosenFavoredNodesInForLoop.add(favoredNode);
             unavailableNodes.add(favoredNode);
@@ -137,6 +150,7 @@ public final class SCMContainerPlacementRackScatter
                 metadataSizeRequired, dataSizeRequired);
         if (node != null) {
           chosenNodes.add((DatanodeDetails) node);
+          rackCntMap.merge(rack, 1, Math::addExact);
           mutableFavoredNodes.remove(node);
           unavailableNodes.add(node);
           nodesRequired--;
@@ -208,7 +222,6 @@ public final class SCMContainerPlacementRackScatter
           " ExcludedNode = " + excludedNodesCount,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
     }
-
     List<DatanodeDetails> mutableFavoredNodes = new ArrayList<>();
     if (favoredNodes != null) {
       // Generate mutableFavoredNodes, only stores valid favoredNodes
@@ -228,14 +241,15 @@ public final class SCMContainerPlacementRackScatter
       usedNodes = Collections.emptyList();
     }
     List<Node> racks = getAllRacks();
-    Set<Node> usedRacks = usedNodes.stream()
+    Map<Node, Integer> usedRacksCntMap = usedNodes.stream()
             .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
             .filter(node -> node != null)
-            .collect(Collectors.toSet());
+            .collect(Collectors.toMap(Function.identity(), e -> 1,
+                    Math::addExact));
     int requiredReplicationFactor = usedNodes.size() + nodesRequired;
-    int numberOfRacksRequired =
-            getRequiredRackCount(requiredReplicationFactor);
-    int additionalRacksRequired = numberOfRacksRequired - usedRacks.size();
+    int numberOfRacksRequired = 
getRequiredRackCount(requiredReplicationFactor);
+    int additionalRacksRequired =
+            numberOfRacksRequired - usedRacksCntMap.size();
     if (nodesRequired < additionalRacksRequired) {
       String reason = "Required nodes size: " + nodesRequired
               + " is less than required number of racks to choose: "
@@ -243,12 +257,15 @@ public final class SCMContainerPlacementRackScatter
       LOG.warn("Placement policy cannot choose the enough racks. {}"
                       + "Total number of Required Racks: {} Used Racks Count:" 
+
                       " {}, Required Nodes count: {}",
-              reason, numberOfRacksRequired, usedRacks.size(), nodesRequired);
+              reason, numberOfRacksRequired, usedRacksCntMap.size(),
+              nodesRequired);
       throw new SCMException(reason,
               SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
     }
+    int maxReplicasPerRack = getMaxReplicasPerRack(requiredReplicationFactor,
+            numberOfRacksRequired);
     // For excluded nodes, we sort their racks at rear
-    racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacks);
+    racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
 
     List<Node> unavailableNodes = new ArrayList<>();
     if (excludedNodes != null) {
@@ -265,14 +282,16 @@ public final class SCMContainerPlacementRackScatter
       LOG.warn("Placement policy cannot choose the enough racks. {}"
                       + "Total number of Required Racks: {} Used Racks Count:" 
+
                       " {}, Required Nodes count: {}",
-              reason, numberOfRacksRequired, usedRacks.size(), nodesRequired);
+              reason, numberOfRacksRequired, usedRacksCntMap.size(),
+              nodesRequired);
       throw new SCMException(reason,
               SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
     }
 
     chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
             mutableFavoredNodes, additionalRacksRequired,
-            metadataSizeRequired, dataSizeRequired, 1));
+            Pair.of(metadataSizeRequired, dataSizeRequired), 1,
+            Pair.of(usedRacksCntMap, maxReplicasPerRack)));
 
     if (chosenNodes.size() < additionalRacksRequired) {
       String reason = "Chosen nodes size from Unique Racks: " + chosenNodes
@@ -288,16 +307,13 @@ public final class SCMContainerPlacementRackScatter
     }
 
     if (chosenNodes.size() < nodesRequired) {
-      racks.addAll(usedRacks);
-      usedRacks.addAll(chosenNodes.stream()
-              .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
-              .filter(node -> node != null)
-              .collect(Collectors.toSet()));
-      sortRackWithExcludedNodes(racks, excludedNodes, usedRacks);
-      racks.addAll(usedRacks);
+      racks.addAll(usedRacksCntMap.keySet());
+      racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
+      racks.addAll(usedRacksCntMap.keySet());
       chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
               mutableFavoredNodes, nodesRequired - chosenNodes.size(),
-              metadataSizeRequired, dataSizeRequired, Integer.MAX_VALUE));
+              Pair.of(metadataSizeRequired, dataSizeRequired),
+              Integer.MAX_VALUE, Pair.of(usedRacksCntMap, 
maxReplicasPerRack)));
     }
 
     List<DatanodeDetails> result = new ArrayList<>(chosenNodes);
@@ -319,9 +335,8 @@ public final class SCMContainerPlacementRackScatter
                       .flatMap(List::stream).collect(Collectors.toList()),
                 requiredReplicationFactor);
     if (!placementStatus.isPolicySatisfied()) {
-      String errorMsg = "ContainerPlacementPolicy not met, currentRacks is " +
-          placementStatus.actualPlacementCount() + " desired racks is " +
-          placementStatus.expectedPlacementCount();
+      String errorMsg = "ContainerPlacementPolicy not met. Misreplication" +
+              " Reason: " + placementStatus.misReplicatedReason();
       throw new SCMException(errorMsg, null);
     }
     return result;
@@ -416,7 +431,7 @@ public final class SCMContainerPlacementRackScatter
    * @return
    */
   private List<Node> sortRackWithExcludedNodes(List<Node> racks,
-          List<DatanodeDetails> excludedNodes, Set<Node> usedRacks) {
+          List<DatanodeDetails> excludedNodes, Map<Node, Integer> usedRacks) {
     if ((excludedNodes == null || excludedNodes.isEmpty())
             && usedRacks.isEmpty()) {
       return racks;
@@ -425,12 +440,12 @@ public final class SCMContainerPlacementRackScatter
         .map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
         // Dead Nodes have been removed from the topology and so have a
         // null rack. We need to exclude those from the rack list.
-        .filter(node -> node != null)
-        .filter(node -> !usedRacks.contains(node))
+        .filter(Objects::nonNull)
+        .filter(node -> !usedRacks.containsKey(node))
         .collect(Collectors.toSet());
     List <Node> result = new ArrayList<>();
     for (Node rack : racks) {
-      if (!usedRacks.contains(rack) && !lessPreferredRacks.contains(rack)) {
+      if (!usedRacks.containsKey(rack) && !lessPreferredRacks.contains(rack)) {
         result.add(rack);
       }
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index cdfd57d1d0..2aa1121101 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -40,7 +41,7 @@ import java.util.List;
  * can be practically used.
  */
 public final class SCMContainerPlacementRandom extends SCMCommonPlacementPolicy
-    implements PlacementPolicy {
+    implements PlacementPolicy<ContainerReplica> {
   @VisibleForTesting
   public static final Logger LOG =
       LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index befc0543a3..2287475d82 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -105,6 +105,14 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
         && !p.isClosed();
   }
 
+  @Override
+  protected int getMaxReplicasPerRack(int numReplicas, int numberOfRacks) {
+    if (numberOfRacks == 1) {
+      return numReplicas;
+    }
+    return Math.max(numReplicas - 1, 1);
+  }
+
   /**
    * Filter out viable nodes based on
    * 1. nodes that are healthy
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
index 9950367972..eb0741662c 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -84,7 +85,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -691,11 +691,20 @@ public final class HddsTestUtils {
   }
 
   public static Set<ContainerReplica> getReplicas(
+          final ContainerID containerId,
+          final ContainerReplicaProto.State state,
+          final long sequenceId,
+          final DatanodeDetails... datanodeDetails) {
+    return Sets.newHashSet(getReplicas(containerId, state, sequenceId,
+            Arrays.asList(datanodeDetails)));
+  }
+
+  public static List<ContainerReplica> getReplicas(
       final ContainerID containerId,
       final ContainerReplicaProto.State state,
       final long sequenceId,
-      final DatanodeDetails... datanodeDetails) {
-    Set<ContainerReplica> replicas = new HashSet<>();
+      final Iterable<DatanodeDetails> datanodeDetails) {
+    List<ContainerReplica> replicas = new ArrayList<>();
     for (DatanodeDetails datanode : datanodeDetails) {
       replicas.add(getReplicas(containerId, state,
           sequenceId, datanode.getUuid(), datanode));
@@ -744,14 +753,14 @@ public final class HddsTestUtils {
     return builder.build();
   }
 
-  public static Set<ContainerReplica> getReplicasWithReplicaIndex(
+  public static List<ContainerReplica> getReplicasWithReplicaIndex(
           final ContainerID containerId,
           final ContainerReplicaProto.State state,
           final long usedBytes,
           final long keyCount,
           final long sequenceId,
-          final DatanodeDetails... datanodeDetails) {
-    Set<ContainerReplica> replicas = new HashSet<>();
+          final Iterable<DatanodeDetails> datanodeDetails) {
+    List<ContainerReplica> replicas = new ArrayList<>();
     int replicaIndex = 1;
     for (DatanodeDetails datanode : datanodeDetails) {
       replicas.add(getReplicaBuilder(containerId, state,
@@ -762,6 +771,17 @@ public final class HddsTestUtils {
     return replicas;
   }
 
+  public static Set<ContainerReplica> getReplicasWithReplicaIndex(
+          final ContainerID containerId,
+          final ContainerReplicaProto.State state,
+          final long usedBytes,
+          final long keyCount,
+          final long sequenceId,
+          final DatanodeDetails... datanodeDetails) {
+    return Sets.newHashSet(getReplicasWithReplicaIndex(containerId, state,
+            usedBytes, keyCount, sequenceId, Arrays.asList(datanodeDetails)));
+  }
+
 
 
   public static Pipeline getRandomPipeline() {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
index d11ca91b36..885e474749 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java
@@ -18,21 +18,35 @@
 
 package org.apache.hadoop.hdds.scm;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
 
 /**
  * Test functions of SCMCommonPlacementPolicy.
@@ -51,25 +65,237 @@ public class TestSCMCommonPlacementPolicy {
   @Test
   public void testGetResultSet() throws SCMException {
     DummyPlacementPolicy dummyPlacementPolicy =
-        new DummyPlacementPolicy(nodeManager, conf);
-    List<DatanodeDetails> list =
-        nodeManager.getNodes(NodeStatus.inServiceHealthy());
+        new DummyPlacementPolicy(nodeManager, conf, 5);
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
     List<DatanodeDetails> result = dummyPlacementPolicy.getResultSet(3, list);
     Set<DatanodeDetails> resultSet = new HashSet<>(result);
     Assertions.assertNotEquals(1, resultSet.size());
   }
 
+  private void testReplicasToFixMisreplication(
+          List<ContainerReplica> replicas,
+          DummyPlacementPolicy placementPolicy,
+          int expectedNumberOfReplicasToCopy,
+          Map<Node, Integer> expectedNumberOfCopyOperationFromRack) {
+    Set<ContainerReplica> replicasToCopy = placementPolicy
+            .replicasToCopyToFixMisreplication(Sets.newHashSet(replicas));
+    Assertions.assertEquals(expectedNumberOfReplicasToCopy,
+            replicasToCopy.size());
+    Map<Node, Long> rackCopyMap =
+            replicasToCopy.stream().collect(Collectors.groupingBy(
+                    replica -> placementPolicy
+                            .getPlacementGroup(replica.getDatanodeDetails()),
+            Collectors.counting()));
+    Set<Node> racks = replicas.stream()
+            .map(ContainerReplica::getDatanodeDetails)
+            .map(placementPolicy::getPlacementGroup)
+            .collect(Collectors.toSet());
+    for (Node rack: racks) {
+      Assertions.assertEquals(
+              expectedNumberOfCopyOperationFromRack.getOrDefault(rack, 0),
+              rackCopyMap.getOrDefault(rack, 0L).intValue());
+    }
+  }
+
+  @Test
+  public void testReplicasToFixMisreplicationWithOneMisreplication() {
+    DummyPlacementPolicy dummyPlacementPolicy =
+            new DummyPlacementPolicy(nodeManager, conf, 5);
+    List<Node> racks = dummyPlacementPolicy.racks;
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 5)
+                    .map(list::get).collect(Collectors.toList());
+    List<ContainerReplica> replicas =
+            HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+                    CLOSED, 0, 0, 0, replicaDns);
+    testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 1,
+            ImmutableMap.of(racks.get(0), 1));
+  }
+
+  @Test
+  public void testReplicasToFixMisreplicationWithTwoMisreplication() {
+    DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+            nodeManager, conf,
+            GenericTestUtils.getReverseMap(
+                    ImmutableMap.of(0, ImmutableList.of(0, 1, 5),
+                            1, ImmutableList.of(6),
+                            2, ImmutableList.of(2, 7),
+                            3, ImmutableList.of(3, 8),
+                            4, ImmutableList.of(4, 9))), 5);
+    List<Node> racks = dummyPlacementPolicy.racks;
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 5)
+                    .map(list::get).collect(Collectors.toList());
+    List<ContainerReplica> replicas =
+            HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+                    CLOSED, 0, 0, 0, replicaDns);
+    testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+            ImmutableMap.of(racks.get(0), 2));
+  }
+
+  @Test
+  public void testReplicasToFixMisreplicationWithThreeMisreplication() {
+    DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+            nodeManager, conf,
+            GenericTestUtils.getReverseMap(
+                    ImmutableMap.of(0, ImmutableList.of(0, 1, 2, 5),
+                            1, ImmutableList.of(6),
+                            2, ImmutableList.of(7),
+                            3, ImmutableList.of(3, 8),
+                            4, ImmutableList.of(4, 9))), 5);
+    List<Node> racks = dummyPlacementPolicy.racks;
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 5)
+                    .map(list::get).collect(Collectors.toList());
+    List<ContainerReplica> replicas =
+            HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+                    CLOSED, 0, 0, 0, replicaDns);
+    testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 3,
+            ImmutableMap.of(racks.get(0), 3));
+  }
+
+  @Test
+  public void
+      testReplicasToFixMisreplicationWithThreeMisreplicationOnDifferentRack() {
+    DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+            nodeManager, conf,
+            GenericTestUtils.getReverseMap(
+                    ImmutableMap.of(0, ImmutableList.of(0, 1, 2, 5),
+                            1, ImmutableList.of(6),
+                            2, ImmutableList.of(7),
+                            3, ImmutableList.of(3, 4, 8),
+                            4, ImmutableList.of(9))), 5);
+    List<Node> racks = dummyPlacementPolicy.racks;
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 4)
+                    .map(list::get).collect(Collectors.toList());
+    //Creating Replicas without replica Index
+    List<ContainerReplica> replicas = HddsTestUtils
+            .getReplicas(new ContainerID(1), CLOSED, 0, replicaDns);
+    testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 3,
+            ImmutableMap.of(racks.get(0), 2, racks.get(3), 1));
+  }
+
+  @Test
+  public void
+      testReplicasToFixMisreplicationWithReplicationFactorLessThanNumberOfRack(
+  ) {
+    DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+            nodeManager, conf,
+            GenericTestUtils.getReverseMap(
+                    ImmutableMap.of(0, ImmutableList.of(0, 1, 5),
+                            1, ImmutableList.of(6),
+                            2, ImmutableList.of(2, 7),
+                            3, ImmutableList.of(3, 4, 8),
+                            4, ImmutableList.of(9))), 5);
+    List<Node> racks = dummyPlacementPolicy.racks;
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 1, 3, 4)
+                    .map(list::get).collect(Collectors.toList());
+    //Creating Replicas without replica Index for replicas < number of racks
+    List<ContainerReplica> replicas = HddsTestUtils
+            .getReplicas(new ContainerID(1), CLOSED, 0, replicaDns);
+    testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+            ImmutableMap.of(racks.get(0), 1, racks.get(3), 1));
+  }
+
+  @Test
+  public void
+      testReplicasToFixMisreplicationWithReplicationFactorMoreThanNumberOfRack(
+  ) {
+    DummyPlacementPolicy dummyPlacementPolicy = new DummyPlacementPolicy(
+            nodeManager, conf,
+            GenericTestUtils.getReverseMap(
+                    ImmutableMap.of(0, ImmutableList.of(0, 1, 2, 5),
+                            1, ImmutableList.of(6),
+                            2, ImmutableList.of(7),
+                            3, ImmutableList.of(3, 4, 8),
+                            4, ImmutableList.of(9))), 5);
+    List<Node> racks = dummyPlacementPolicy.racks;
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 4, 6)
+                    .map(list::get).collect(Collectors.toList());
+    //Creating Replicas without replica Index for replicas >number of racks
+    List<ContainerReplica> replicas = HddsTestUtils
+            .getReplicas(new ContainerID(1), CLOSED, 0, replicaDns);
+    testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+            ImmutableMap.of(racks.get(0), 1, racks.get(3), 1));
+  }
+
+  @Test
+  public void testReplicasToFixMisreplicationMaxReplicaPerRack() {
+    DummyPlacementPolicy dummyPlacementPolicy =
+            new DummyPlacementPolicy(nodeManager, conf, 2);
+    List<Node> racks = dummyPlacementPolicy.racks;
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 2, 4, 6, 8)
+                    .map(list::get).collect(Collectors.toList());
+    List<ContainerReplica> replicas =
+            HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+                    CLOSED, 0, 0, 0, replicaDns);
+    testReplicasToFixMisreplication(replicas, dummyPlacementPolicy, 2,
+            ImmutableMap.of(racks.get(0), 2));
+  }
+
+  @Test
+  public void testReplicasWithoutMisreplication() {
+    DummyPlacementPolicy dummyPlacementPolicy =
+            new DummyPlacementPolicy(nodeManager, conf, 5);
+    List<DatanodeDetails> list = nodeManager.getAllNodes();
+    List<DatanodeDetails> replicaDns = Stream.of(0, 1, 2, 3, 4)
+                    .map(list::get).collect(Collectors.toList());
+    List<ContainerReplica> replicas =
+            HddsTestUtils.getReplicasWithReplicaIndex(new ContainerID(1),
+                    CLOSED, 0, 0, 0, replicaDns);
+
+    Set<ContainerReplica> replicasToCopy = dummyPlacementPolicy
+            .replicasToCopyToFixMisreplication(Sets.newHashSet(replicas));
+    Assertions.assertEquals(0, replicasToCopy.size());
+  }
+
+
+
   private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy {
+    private Map<DatanodeDetails, Node> rackMap;
+    private List<Node> racks;
+    private int rackCnt;
+
+    DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf,
+        int rackCnt) {
+      this(nodeManager, conf,
+           IntStream.range(0, nodeManager.getAllNodes().size()).boxed()
+           .collect(Collectors.toMap(Function.identity(),
+                   idx -> idx % rackCnt)), rackCnt);
+    }
 
-    DummyPlacementPolicy(
-        NodeManager nodeManager,
-        ConfigurationSource conf) {
+    DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf,
+            Map<Integer, Integer> datanodeRackMap, int rackCnt) {
       super(nodeManager, conf);
+      this.rackCnt = rackCnt;
+      this.racks = IntStream.range(0, rackCnt)
+      .mapToObj(i -> Mockito.mock(Node.class)).collect(Collectors.toList());
+      List<DatanodeDetails> datanodeDetails = nodeManager.getAllNodes();
+      rackMap = datanodeRackMap.entrySet().stream()
+              .collect(Collectors.toMap(
+                      entry -> datanodeDetails.get(entry.getKey()),
+                      entry -> racks.get(entry.getValue())));
     }
 
+
+
     @Override
     public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
       return healthyNodes.get(0);
     }
+
+    @Override
+    public Node getPlacementGroup(DatanodeDetails dn) {
+      return rackMap.get(dn);
+    }
+
+    @Override
+    protected int getRequiredRackCount(int numReplicas) {
+      return Math.min(numReplicas, rackCnt);
+    }
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index c35cb2b455..254a20e480 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -19,7 +19,9 @@ package 
org.apache.hadoop.hdds.scm.container.placement.algorithms;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
@@ -176,7 +179,8 @@ public class TestContainerPlacementFactory {
   /**
    * A dummy container placement implementation for test.
    */
-  public static class DummyImpl implements PlacementPolicy {
+  public static class DummyImpl implements
+          PlacementPolicy<ContainerReplica> {
     @Override
     public List<DatanodeDetails> chooseDatanodes(
         List<DatanodeDetails> usedNodes,
@@ -191,6 +195,12 @@ public class TestContainerPlacementFactory {
         validateContainerPlacement(List<DatanodeDetails> dns, int replicas) {
       return new ContainerPlacementStatusDefault(1, 1, 1);
     }
+
+    @Override
+    public Set<ContainerReplica> replicasToCopyToFixMisreplication(
+            Set<ContainerReplica> replicas) {
+      return Collections.emptySet();
+    }
   }
 
   @Test
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
index 46bf8031ef..b49037e583 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java
@@ -58,6 +58,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
@@ -245,20 +246,34 @@ public class TestSCMContainerPlacementRackScatter {
     nodeNum = 5;
     if (datanodeCount > nodeNum) {
       assumeTrue(datanodeCount >= NODE_PER_RACK);
-      datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
-      Assertions.assertEquals(nodeNum, datanodeDetails.size());
-      Assertions.assertEquals(getRackSize(datanodeDetails),
-          Math.min(nodeNum, rackNum));
+      if (datanodeCount == 6) {
+        int finalNodeNum = nodeNum;
+        SCMException e = assertThrows(SCMException.class,
+                () -> policy.chooseDatanodes(null, null, finalNodeNum, 0, 15));
+        assertEquals(FAILED_TO_FIND_HEALTHY_NODES, e.getResult());
+      } else {
+        datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+        Assertions.assertEquals(nodeNum, datanodeDetails.size());
+        Assertions.assertEquals(getRackSize(datanodeDetails),
+                Math.min(nodeNum, rackNum));
+      }
     }
 
     //  10 replicas
     nodeNum = 10;
     if (datanodeCount > nodeNum) {
       assumeTrue(datanodeCount > 2 * NODE_PER_RACK);
-      datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
-      Assertions.assertEquals(nodeNum, datanodeDetails.size());
-      Assertions.assertEquals(getRackSize(datanodeDetails),
-          Math.min(nodeNum, rackNum));
+      if (datanodeCount == 11) {
+        int finalNodeNum = nodeNum;
+        SCMException e = assertThrows(SCMException.class,
+                () -> policy.chooseDatanodes(null, null, finalNodeNum, 0, 15));
+        assertEquals(FAILED_TO_FIND_HEALTHY_NODES, e.getResult());
+      } else {
+        datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15);
+        Assertions.assertEquals(nodeNum, datanodeDetails.size());
+        Assertions.assertEquals(getRackSize(datanodeDetails),
+                Math.min(nodeNum, rackNum));
+      }
     }
   }
 
@@ -314,11 +329,20 @@ public class TestSCMContainerPlacementRackScatter {
     totalNum = 5;
     excludedNodes.clear();
     excludedNodes.add(datanodes.get(0));
-    datanodeDetails = policy.chooseDatanodes(
-        excludedNodes, null, nodeNum, 0, 15);
-    Assertions.assertEquals(nodeNum, datanodeDetails.size());
-    Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes),
-        Math.min(totalNum, rackNum));
+    if (datanodeCount == 6) {
+      int finalNodeNum = nodeNum;
+      SCMException e = assertThrows(SCMException.class,
+              () -> policy.chooseDatanodes(excludedNodes, null,
+                      finalNodeNum, 0, 15));
+      assertEquals(FAILED_TO_FIND_HEALTHY_NODES, e.getResult());
+    } else {
+      datanodeDetails = policy.chooseDatanodes(
+              excludedNodes, null, nodeNum, 0, 15);
+      Assertions.assertEquals(nodeNum, datanodeDetails.size());
+      Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes),
+              Math.min(totalNum, rackNum));
+    }
+
 
     // 5 replicas, two existing datanodes on different rack
     nodeNum = 3;
@@ -344,7 +368,9 @@ public class TestSCMContainerPlacementRackScatter {
       SCMException e = assertThrows(SCMException.class,
           () -> policy.chooseDatanodes(excludedNodes, null, 3, 0, 15));
       String message = e.getMessage();
-      assumeTrue(message.contains("ContainerPlacementPolicy not met"));
+      assertTrue(message.contains("Chosen nodes size from Unique Racks: 1," +
+              " but required nodes to choose from Unique Racks: " +
+              "2 do not match."));
     } else {
       datanodeDetails = policy.chooseDatanodes(
           excludedNodes, null, nodeNum, 0, 15);
@@ -567,7 +593,7 @@ public class TestSCMContainerPlacementRackScatter {
     assertEquals("Chosen nodes size from Unique Racks: 1, but required " +
             "nodes to choose from Unique Racks: 2 do not match.",
             exception.getMessage());
-    assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES,
+    assertEquals(FAILED_TO_FIND_HEALTHY_NODES,
             exception.getResult());
   }
 
diff --git 
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
 
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
index e03f0a7ffe..771f5137a4 100644
--- 
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
+++ 
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
@@ -25,6 +25,8 @@ import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Preconditions;
@@ -32,6 +34,7 @@ import com.google.common.base.Supplier;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Level;
@@ -43,6 +46,7 @@ import org.junit.Assert;
 import org.mockito.Mockito;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.stream.Collectors;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertTrue;
@@ -283,6 +287,12 @@ public abstract class GenericTestUtils {
     return value;
   }
 
+  public static <K, V> Map<V, K> getReverseMap(Map<K, List<V>> map) {
+    return map.entrySet().stream().flatMap(entry -> entry.getValue().stream()
+            .map(v -> Pair.of(v, entry.getKey())))
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+  }
+
   /**
    * Class to capture logs for doing assertions.
    */
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index d60bbe9801..27de4c743a 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -343,7 +343,8 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
    * of a datanode via setMisRepWhenDnPresent. If a DN with that UUID is passed
    * to validateContainerPlacement, then it will return an invalid placement.
    */
-  private static class MockPlacementPolicy implements PlacementPolicy {
+  private static class MockPlacementPolicy implements
+          PlacementPolicy<ContainerReplica> {
 
     private UUID misRepWhenDnPresent = null;
 
@@ -370,6 +371,13 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
       }
     }
 
+    @Override
+    public Set<ContainerReplica> replicasToCopyToFixMisreplication(
+            Set<ContainerReplica> replicas) {
+      return Collections.emptySet();
+    }
+
+
     private boolean isDnPresent(List<DatanodeDetails> dns) {
       for (DatanodeDetails dn : dns) {
         if (misRepWhenDnPresent != null


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to